egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r617002718



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {

Review comment:
       Hmm that's a good question, I hadn't quite considered that. What are 
your thoughts on adding a note in docstring for `withBackendVersion` stating 
that its value will take precedence and will preclude reading the version from 
the cluster? Then we would need to trust users to select what fits their need 
beyond that point.  Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to