echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629337655
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
* Elasticsearch.
*
* @param usePartialUpdate set to true to issue partial updates
- * @return the {@link Write} with the partial update control set
+ * @return the {@link DocToBulk} with the partial update control set
*/
- public Write withUsePartialUpdate(boolean usePartialUpdate) {
+ public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
return builder().setUsePartialUpdate(usePartialUpdate).build();
}
+ /**
+ * Whether to use scripted updates and what script to use.
+ *
+ * @param source set to the value of the script source, painless lang
+ * @return the {@link DocToBulk} with the scripted updates set
+ */
+ public DocToBulk withUpsertScript(String source) {
+ return
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+ }
+
+ /**
+ * Provide a function to extract the doc version from the document. This
version number will be
+ * used as the document version in Elasticsearch. Should the function
throw an Exception then
+ * the batch will fail and the exception propagated. Incompatible with
update operations and
+ * should only be used with withUsePartialUpdate(false)
+ *
+ * @param docVersionFn to extract the document version
+ * @return the {@link DocToBulk} with the function set
+ */
+ public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+ checkArgument(docVersionFn != null, "docVersionFn must not be null");
+ return builder().setDocVersionFn(docVersionFn).build();
+ }
+
+ /**
+ * Provide a function to extract the target operation either upsert or
delete from the document
+ * fields allowing dynamic bulk operation decision. While using
withIsDeleteFn, it should be
+ * taken care that the document's id extraction is defined using the
withIdFn function or else
+ * IllegalArgumentException is thrown. Should the function throw an
Exception then the batch
+ * will fail and the exception propagated.
+ *
+ * @param isDeleteFn set to true for deleting the specific document
+ * @return the {@link Write} with the function set
+ */
+ public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn
isDeleteFn) {
+ checkArgument(isDeleteFn != null, "deleteFn is required");
+ return builder().setIsDeleteFn(isDeleteFn).build();
+ }
+
+ /**
+ * Provide a function to extract the doc version from the document. This
version number will be
+ * used as the document version in Elasticsearch. Should the function
throw an Exception then
+ * the batch will fail and the exception propagated. Incompatible with
update operations and
+ * should only be used with withUsePartialUpdate(false)
+ *
+ * @param docVersionType the version type to use, one of {@value
ElasticsearchIO#VERSION_TYPES}
+ * @return the {@link DocToBulk} with the doc version type set
+ */
+ public DocToBulk withDocVersionType(String docVersionType) {
+ checkArgument(
+ VERSION_TYPES.contains(docVersionType),
+ "docVersionType must be one of " + "%s",
+ String.join(", ", VERSION_TYPES));
+ return builder().setDocVersionType(docVersionType).build();
+ }
+
+ /**
+ * Use to set explicitly which version of Elasticsearch the destination
cluster is running.
+ * Providing this hint means there is no need for setting {@link
+ * DocToBulk#withConnectionConfiguration}. This can also be very useful
for testing purposes.
+ *
+ * @param backendVersion the major version number of the version of
Elasticsearch being run in
+ * the cluster where documents will be indexed.
+ * @return the {@link DocToBulk} with the Elasticsearch major version
number set
+ */
+ public DocToBulk withBackendVersion(int backendVersion) {
+ checkArgument(
+ VALID_CLUSTER_VERSIONS.contains(backendVersion),
+ "Backend version may only be one of " + "%s",
+ String.join(", ", VERSION_TYPES));
+ return builder().setBackendVersion(backendVersion).build();
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<String> docs) {
+ ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
+ Integer backendVersion = getBackendVersion();
+ Write.FieldValueExtractFn idFn = getIdFn();
+ Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+ checkState(
+ (backendVersion != null || connectionConfiguration != null),
+ "withBackendVersion() or withConnectionConfiguration() is required");
+ checkArgument(
+ isDeleteFn == null || idFn != null,
+ "Id needs to be specified by withIdFn for delete operation");
+
+ return docs.apply(ParDo.of(new DocToBulkFn(this)));
+ }
+
+ // Encapsulates the elements which form the metadata for an Elasticsearch
bulk operation
+ private static class DocumentMetadata implements Serializable {
+ final String index;
+ final String type;
+ final String id;
+ final Integer retryOnConflict;
+ final String routing;
+ final Integer backendVersion;
+ final String version;
+ final String versionType;
+
+ DocumentMetadata(
+ String index,
+ String type,
+ String id,
+ Integer retryOnConflict,
+ String routing,
+ Integer backendVersion,
+ String version,
+ String versionType) {
+ this.index = index;
+ this.id = id;
+ this.type = type;
+ this.retryOnConflict = retryOnConflict;
+ this.routing = routing;
+ this.backendVersion = backendVersion;
+ this.version = version;
+ this.versionType = versionType;
+ }
+ }
+
+ private static class DocumentMetadataSerializer extends
StdSerializer<DocumentMetadata> {
+ private DocumentMetadataSerializer() {
+ super(DocumentMetadata.class);
+ }
+
+ @Override
+ public void serialize(DocumentMetadata value, JsonGenerator gen,
SerializerProvider provider)
+ throws IOException {
+ gen.writeStartObject();
+ if (value.index != null) {
+ gen.writeStringField("_index", value.index);
+ }
+ if (value.type != null) {
+ gen.writeStringField("_type", value.type);
+ }
+ if (value.id != null) {
+ gen.writeStringField("_id", value.id);
+ }
+ if (value.routing != null) {
+ gen.writeStringField("routing", value.routing);
+ }
+ if (value.retryOnConflict != null && value.backendVersion <= 6) {
+ gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+ }
+ if (value.retryOnConflict != null && value.backendVersion >= 7) {
+ gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+ }
+ if (value.version != null) {
+ gen.writeStringField("version", value.version);
+ }
+ if (value.versionType != null) {
+ gen.writeStringField("version_type", value.versionType);
+ }
+ gen.writeEndObject();
+ }
+ }
+
+ @VisibleForTesting
+ static String createBulkApiEntity(DocToBulk spec, String document, int
backendVersion)
+ throws IOException {
+ String documentMetadata = "{}";
+ boolean isDelete = false;
+ if (spec.getIndexFn() != null || spec.getTypeFn() != null ||
spec.getIdFn() != null) {
+ // parse once and reused for efficiency
+ JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+ documentMetadata = getDocumentMetadata(spec, parsedDocument,
backendVersion);
+ if (spec.getIsDeleteFn() != null) {
+ isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+ }
+ }
+
+ if (isDelete) {
+ // delete request used for deleting a document
+ return String.format("{ \"delete\" : %s }%n", documentMetadata);
+ } else {
+ // index is an insert/upsert and update is a partial update (or insert
if not
+ // existing)
+ if (spec.getUsePartialUpdate()) {
+ return String.format(
+ "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" :
true }%n",
+ documentMetadata, document);
+ } else if (spec.getUpsertScript() != null) {
+ return String.format(
+ "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+ + "\"params\": %s}, \"upsert\" : %s }%n",
+ documentMetadata, spec.getUpsertScript(), document, document);
+ } else {
+ return String.format("{ \"index\" : %s }%n%s%n", documentMetadata,
document);
+ }
+ }
+ }
+
+ private static String lowerCaseOrNull(String input) {
+ return input == null ? null : input.toLowerCase();
+ }
+
+ /**
+ * Extracts the components that comprise the document address from the
document using the {@link
+ * Write.FieldValueExtractFn} configured. This allows any or all of the
index, type and document
+ * id to be controlled on a per document basis. If none are provided then
an empty default of
+ * {@code {}} is returned. Sanitization of the index is performed,
automatically lower-casing
+ * the value as required by Elasticsearch.
+ *
+ * @param parsedDocument the json from which the index, type and id may be
extracted
+ * @return the document address as JSON or the default
+ * @throws IOException if the document cannot be parsed as JSON
+ */
+ private static String getDocumentMetadata(
+ DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws
IOException {
+ DocumentMetadata metadata =
+ new DocumentMetadata(
+ spec.getIndexFn() != null
+ ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+ : null,
+ spec.getTypeFn() != null ?
spec.getTypeFn().apply(parsedDocument) : null,
+ spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) :
null,
+ (spec.getUsePartialUpdate()
+ || (spec.getUpsertScript() != null &&
!spec.getUpsertScript().isEmpty()))
+ ? DEFAULT_RETRY_ON_CONFLICT
+ : null,
+ spec.getRoutingFn() != null ?
spec.getRoutingFn().apply(parsedDocument) : null,
+ backendVersion,
+ spec.getDocVersionFn() != null ?
spec.getDocVersionFn().apply(parsedDocument) : null,
+ spec.getDocVersionType());
+ return OBJECT_MAPPER.writeValueAsString(metadata);
+ }
+
+ /** {@link DoFn} to for the {@link DocToBulk} transform. */
+ @VisibleForTesting
+ static class DocToBulkFn extends DoFn<String, String> {
+ private final DocToBulk spec;
+ private int backendVersion;
+
+ public DocToBulkFn(DocToBulk spec) {
+ this.spec = spec;
+ }
+
+ @Setup
+ public void setup() throws IOException {
+ ConnectionConfiguration connectionConfiguration =
spec.getConnectionConfiguration();
+ if (spec.getBackendVersion() == null) {
Review comment:
plus,
if we offer the ability to have _withBackendVersion()_ to write, it needs
to be uniform and offer it to read as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]