egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r617016322



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {
+          backendVersion = 
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+        } else {
+          backendVersion = spec.getBackendVersion();
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} convenience wrapper for doing both document to bulk 
API serialization as
+   * well as batching those Bulk API entities and writing them to an 
Elasticsearch cluster. This
+   * class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for 
convenience and backward
+   * compatibility.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+
+    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+
+    public abstract DocToBulk getDocToBulk();
+
+    public abstract BulkIO getBulkIO();
+
+    abstract Builder writeBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setDocToBulk(DocToBulk docToBulk);
+
+      abstract Builder setBulkIO(BulkIO bulkIO);
+
+      abstract Write build();
+    }
+
+    // For building Doc2Bulk
+    /** Refer to {@link DocToBulk#withIdFn}. */
+    public Write withIdFn(FieldValueExtractFn idFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIdFn(idFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIndexFn}. */
+    public Write withIndexFn(FieldValueExtractFn indexFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIndexFn(indexFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withRoutingFn}. */
+    public Write withRoutingFn(FieldValueExtractFn routingFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withRoutingFn(routingFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withTypeFn}. */
+    public Write withTypeFn(FieldValueExtractFn typeFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withTypeFn(typeFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionFn}. */
+    public Write withDocVersionFn(FieldValueExtractFn docVersionFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionFn(docVersionFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionType}. */
+    public Write withDocVersionType(String docVersionType) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionType(docVersionType)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withUsePartialUpdate}. */
+    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+      return writeBuilder()
+          .setDocToBulk(getDocToBulk().withUsePartialUpdate(usePartialUpdate))
+          .build();
+    }
+
+    /** Refer to {@link DocToBulk#withUpsertScript}. */
+    public Write withUpsertScript(String source) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withUpsertScript(source)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withBackendVersion}. */
+    public Write withBackendVersion(int backendVersion) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withBackendVersion(backendVersion)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIsDeleteFn}. */
+    public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIsDeleteFn(isDeleteFn)).build();
+    }
+    // End building Doc2Bulk
+
+    /** Refer to {@link BulkIO#withConnectionConfiguration}. */
+    public Write withConnectionConfiguration(ConnectionConfiguration 
connectionConfiguration) {
+      checkArgument(connectionConfiguration != null, "connectionConfiguration 
can not be null");
+
+      return writeBuilder()
+          
.setDocToBulk(getDocToBulk().withConnectionConfiguration(connectionConfiguration))
+          
.setBulkIO(getBulkIO().withConnectionConfiguration(connectionConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSize}. */
+    public Write withMaxBatchSize(long batchSize) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSize(batchSize)).build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */
+    public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSizeBytes(batchSizeBytes)).build();
+    }
+
+    /** Refer to {@link BulkIO#withRetryConfiguration}. */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withRetryConfiguration(retryConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */
+    public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withIgnoreVersionConflicts(ignoreVersionConflicts))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withUseStatefulBatches}. */
+    public Write withUseStatefulBatches(boolean useStatefulBatches) {
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withUseStatefulBatches(useStatefulBatches))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBufferingDuration}. */
+    public Write withMaxBufferingDuration(Duration maxBufferingDuration) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxBufferingDuration(maxBufferingDuration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}. */
+    public Write withMaxParallelRquestsPerWindow(int 
maxParallelRquestsPerWindow) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxParallelRequestsPerWindow(maxParallelRquestsPerWindow))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withAllowableResponseErrors}. */
+    public Write withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrors) {

Review comment:
       This was added for a specific use case with respect to document 
versioning in particular where the ID is monotonically increasing for new 
versions of a given document i.e. higher ID always means newer doc.
   
   In that case, it doesn't matter if an older document arrives at an ES 
cluster after a document with a larger version ID has already been indexed for 
the same doc ID. So ignoring version conflicts can allow for batches containing 
an out-of-date documents to complete without raising an error and getting stuck 
in an infinite retry loop 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to