echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r615664611



##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -158,10 +167,16 @@
 })
 public class ElasticsearchIO {

Review comment:
       This change is backward compatible so the public API stays the same, but 
I think you could write a summing up javadoc paragraph about the separation of 
serialization/write transforms and the use cases that this open.

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -190,44 +219,53 @@ static JsonNode parseResponse(HttpEntity responseEntity) 
throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, int backendVersion, 
boolean partialUpdate)
+  static void checkForErrors(HttpEntity responseEntity, Set<String> 
allowedErrorTypes)
       throws IOException {
+
     JsonNode searchResult = parseResponse(responseEntity);
     boolean errors = searchResult.path("errors").asBoolean();
     if (errors) {
+      int numErrors = 0;
+
       StringBuilder errorMessages =
           new StringBuilder("Error writing to Elasticsearch, some elements 
could not be inserted:");
       JsonNode items = searchResult.path("items");
+      if (items.isMissingNode() || items.size() == 0) {
+        errorMessages.append(searchResult.toString());
+      }
       // some items present in bulk might have errors, concatenate error 
messages
       for (JsonNode item : items) {
+        JsonNode error = item.findValue("error");

Review comment:
       more robust than the previous errorRoot code, thanks !

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -171,13 +186,27 @@ public static Read read() {
         .build();
   }
 
-  public static Write write() {
-    return new AutoValue_ElasticsearchIO_Write.Builder()
+  public static DocToBulk docToBulk() {
+    return new AutoValue_ElasticsearchIO_DocToBulk.Builder()
+        .setUsePartialUpdate(false) // default is document upsert
+        .build();
+  }
+
+  public static BulkIO bulkIO() {
+    return new AutoValue_ElasticsearchIO_BulkIO.Builder()
         // advised default starting batch size in ES docs
         .setMaxBatchSize(1000L)
         // advised default starting batch size in ES docs
         .setMaxBatchSizeBytes(5L * 1024L * 1024L)
-        .setUsePartialUpdate(false) // default is document upsert
+        .setUseStatefulBatches(false)
+        .setMaxParallelRequestsPerWindow(1)
+        .build();
+  }
+
+  public static Write write() {

Review comment:
       backward compatible indeed. Need to check the tests to be sure, but it 
should

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -190,44 +219,53 @@ static JsonNode parseResponse(HttpEntity responseEntity) 
throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, int backendVersion, 
boolean partialUpdate)
+  static void checkForErrors(HttpEntity responseEntity, Set<String> 
allowedErrorTypes)
       throws IOException {
+
     JsonNode searchResult = parseResponse(responseEntity);
     boolean errors = searchResult.path("errors").asBoolean();
     if (errors) {
+      int numErrors = 0;
+
       StringBuilder errorMessages =
           new StringBuilder("Error writing to Elasticsearch, some elements 
could not be inserted:");
       JsonNode items = searchResult.path("items");
+      if (items.isMissingNode() || items.size() == 0) {
+        errorMessages.append(searchResult.toString());
+      }
       // some items present in bulk might have errors, concatenate error 
messages
       for (JsonNode item : items) {
+        JsonNode error = item.findValue("error");
+        if (error == null) {
+          continue;

Review comment:
       coding style: I would prefer having everything under `if (error != 
null)` rather than `continue`

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -190,44 +219,53 @@ static JsonNode parseResponse(HttpEntity responseEntity) 
throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, int backendVersion, 
boolean partialUpdate)
+  static void checkForErrors(HttpEntity responseEntity, Set<String> 
allowedErrorTypes)
       throws IOException {
+
     JsonNode searchResult = parseResponse(responseEntity);
     boolean errors = searchResult.path("errors").asBoolean();
     if (errors) {
+      int numErrors = 0;
+
       StringBuilder errorMessages =
           new StringBuilder("Error writing to Elasticsearch, some elements 
could not be inserted:");
       JsonNode items = searchResult.path("items");
+      if (items.isMissingNode() || items.size() == 0) {
+        errorMessages.append(searchResult.toString());
+      }
       // some items present in bulk might have errors, concatenate error 
messages
       for (JsonNode item : items) {
+        JsonNode error = item.findValue("error");
+        if (error == null) {
+          continue;
+        }
 
-        String errorRootName = "";
-        // when use partial update, the response items includes all the update.
-        if (partialUpdate) {
-          errorRootName = "update";
-        } else {
-          if (backendVersion == 2) {
-            errorRootName = "create";
-          } else if (backendVersion >= 5) {
-            errorRootName = "index";
-          }
+        // N.B. An empty-string within the allowedErrorTypes Set implies all 
errors are allowed.
+        String type = error.path("type").asText();
+        String reason = error.path("reason").asText();
+        String docId = item.findValue("_id").asText();
+        JsonNode causedBy = error.path("caused_by"); // May not be present

Review comment:
       better than get + if null, thanks

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -303,6 +341,73 @@ public static ConnectionConfiguration create(String[] 
addresses, String index, S
           .build();
     }
 
+    /**
+     * Creates a new Elasticsearch connection configuration with no default 
type.
+     *
+     * @param addresses list of addresses of Elasticsearch nodes
+     * @param index the index toward which the requests will be issued
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String[] addresses, String 
index) {
+      checkArgument(addresses != null, "addresses can not be null");
+      checkArgument(addresses.length > 0, "addresses can not be empty");
+      checkArgument(index != null, "index can not be null");
+      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+          .setAddresses(Arrays.asList(addresses))
+          .setIndex(index)
+          .setType("")
+          .setTrustSelfSignedCerts(false)
+          .build();
+    }
+
+    /**
+     * Creates a new Elasticsearch connection configuration with no default 
index nor type.
+     *
+     * @param addresses list of addresses of Elasticsearch nodes
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String[] addresses) {
+      checkArgument(addresses != null, "addresses can not be null");
+      checkArgument(addresses.length > 0, "addresses can not be empty");
+      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+          .setAddresses(Arrays.asList(addresses))
+          .setIndex("")
+          .setType("")
+          .setTrustSelfSignedCerts(false)
+          .build();
+    }
+
+    /**
+     * Generates the bulk API endpoint based on the set values.
+     *
+     * <p>Based on ConnectionConfiguration constructors, we know that one of 
the following is true:
+     *
+     * <ul>
+     *   <li>index and type are non-empty strings
+     *   <li>index is non-empty string, type is empty string
+     *   <li>index and type are empty string
+     * </ul>
+     *
+     * <p>Valid endpoints therefore include:
+     *
+     * <ul>
+     *   <li>/_bulk
+     *   <li>/index_name/_bulk
+     *   <li>/index_name/type_name/_bulk
+     * </ul>
+     */
+    public String getBulkEndPoint() {

Review comment:
       I did not know it was allowed to specify no index: something new ?

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -190,44 +219,53 @@ static JsonNode parseResponse(HttpEntity responseEntity) 
throws IOException {
     return mapper.readValue(responseEntity.getContent(), JsonNode.class);
   }
 
-  static void checkForErrors(HttpEntity responseEntity, int backendVersion, 
boolean partialUpdate)
+  static void checkForErrors(HttpEntity responseEntity, Set<String> 
allowedErrorTypes)
       throws IOException {
+
     JsonNode searchResult = parseResponse(responseEntity);
     boolean errors = searchResult.path("errors").asBoolean();
     if (errors) {
+      int numErrors = 0;
+
       StringBuilder errorMessages =
           new StringBuilder("Error writing to Elasticsearch, some elements 
could not be inserted:");
       JsonNode items = searchResult.path("items");
+      if (items.isMissingNode() || items.size() == 0) {
+        errorMessages.append(searchResult.toString());
+      }
       // some items present in bulk might have errors, concatenate error 
messages
       for (JsonNode item : items) {
+        JsonNode error = item.findValue("error");
+        if (error == null) {
+          continue;
+        }
 
-        String errorRootName = "";
-        // when use partial update, the response items includes all the update.
-        if (partialUpdate) {
-          errorRootName = "update";
-        } else {
-          if (backendVersion == 2) {
-            errorRootName = "create";
-          } else if (backendVersion >= 5) {
-            errorRootName = "index";
-          }
+        // N.B. An empty-string within the allowedErrorTypes Set implies all 
errors are allowed.
+        String type = error.path("type").asText();
+        String reason = error.path("reason").asText();
+        String docId = item.findValue("_id").asText();
+        JsonNode causedBy = error.path("caused_by"); // May not be present
+        String cbReason = causedBy.path("reason").asText();
+        String cbType = causedBy.path("type").asText();
+
+        if (allowedErrorTypes != null
+            && (allowedErrorTypes.contains(type) || 
allowedErrorTypes.contains(cbType))) {
+          continue;
         }
-        JsonNode errorRoot = item.path(errorRootName);
-        JsonNode error = errorRoot.get("error");
-        if (error != null) {
-          String type = error.path("type").asText();
-          String reason = error.path("reason").asText();
-          String docId = errorRoot.path("_id").asText();
-          errorMessages.append(String.format("%nDocument id %s: %s (%s)", 
docId, reason, type));
-          JsonNode causedBy = error.get("caused_by");
-          if (causedBy != null) {
-            String cbReason = causedBy.path("reason").asText();
-            String cbType = causedBy.path("type").asText();
-            errorMessages.append(String.format("%nCaused by: %s (%s)", 
cbReason, cbType));
-          }
+
+        // 'error' field is not null, and the error is not being ignored.
+        numErrors++;
+
+        errorMessages.append(String.format("%nDocument id %s: %s (%s)", docId, 
reason, type));
+
+        if (causedBy.isMissingNode()) {

Review comment:
       ditto

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {
+          backendVersion = 
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+        } else {
+          backendVersion = spec.getBackendVersion();
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} convenience wrapper for doing both document to bulk 
API serialization as
+   * well as batching those Bulk API entities and writing them to an 
Elasticsearch cluster. This
+   * class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for 
convenience and backward
+   * compatibility.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+
+    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+
+    public abstract DocToBulk getDocToBulk();
+
+    public abstract BulkIO getBulkIO();
+
+    abstract Builder writeBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setDocToBulk(DocToBulk docToBulk);
+
+      abstract Builder setBulkIO(BulkIO bulkIO);
+
+      abstract Write build();
+    }
+
+    // For building Doc2Bulk
+    /** Refer to {@link DocToBulk#withIdFn}. */
+    public Write withIdFn(FieldValueExtractFn idFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIdFn(idFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIndexFn}. */
+    public Write withIndexFn(FieldValueExtractFn indexFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIndexFn(indexFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withRoutingFn}. */
+    public Write withRoutingFn(FieldValueExtractFn routingFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withRoutingFn(routingFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withTypeFn}. */
+    public Write withTypeFn(FieldValueExtractFn typeFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withTypeFn(typeFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionFn}. */
+    public Write withDocVersionFn(FieldValueExtractFn docVersionFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionFn(docVersionFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionType}. */
+    public Write withDocVersionType(String docVersionType) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionType(docVersionType)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withUsePartialUpdate}. */
+    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+      return writeBuilder()
+          .setDocToBulk(getDocToBulk().withUsePartialUpdate(usePartialUpdate))
+          .build();
+    }
+
+    /** Refer to {@link DocToBulk#withUpsertScript}. */
+    public Write withUpsertScript(String source) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withUpsertScript(source)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withBackendVersion}. */
+    public Write withBackendVersion(int backendVersion) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withBackendVersion(backendVersion)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIsDeleteFn}. */
+    public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIsDeleteFn(isDeleteFn)).build();
+    }
+    // End building Doc2Bulk
+
+    /** Refer to {@link BulkIO#withConnectionConfiguration}. */
+    public Write withConnectionConfiguration(ConnectionConfiguration 
connectionConfiguration) {
+      checkArgument(connectionConfiguration != null, "connectionConfiguration 
can not be null");
+
+      return writeBuilder()
+          
.setDocToBulk(getDocToBulk().withConnectionConfiguration(connectionConfiguration))
+          
.setBulkIO(getBulkIO().withConnectionConfiguration(connectionConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSize}. */
+    public Write withMaxBatchSize(long batchSize) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSize(batchSize)).build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */
+    public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSizeBytes(batchSizeBytes)).build();
+    }
+
+    /** Refer to {@link BulkIO#withRetryConfiguration}. */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withRetryConfiguration(retryConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */
+    public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withIgnoreVersionConflicts(ignoreVersionConflicts))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withUseStatefulBatches}. */
+    public Write withUseStatefulBatches(boolean useStatefulBatches) {
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withUseStatefulBatches(useStatefulBatches))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBufferingDuration}. */
+    public Write withMaxBufferingDuration(Duration maxBufferingDuration) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxBufferingDuration(maxBufferingDuration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}. */
+    public Write withMaxParallelRquestsPerWindow(int 
maxParallelRquestsPerWindow) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxParallelRequestsPerWindow(maxParallelRquestsPerWindow))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withAllowableResponseErrors}. */
+    public Write withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrors) {

Review comment:
       to avoid stacktrace messages flooding ?

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1030,115 +1135,92 @@ public boolean test(HttpEntity responseEntity) {
     }
   }
 
-  /** A {@link PTransform} writing data to Elasticsearch. */
+  /** A {@link PTransform} converting docs to their Bulk API counterparts. */
   @AutoValue
-  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+  public abstract static class DocToBulk
+      extends PTransform<PCollection<String>, PCollection<String>> {
 
-    /**
-     * Interface allowing a specific field value to be returned from a parsed 
JSON document. This is
-     * used for using explicit document ids, and for dynamic routing 
(index/Type) on a document
-     * basis. A null response will result in default behaviour and an 
exception will be propagated
-     * as a failure.
-     */
-    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race conditions 
on updates
 
-    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+    static {
+      SimpleModule module = new SimpleModule();
+      module.addSerializer(DocumentMetadata.class, new 
DocumentMetadataSerializer());
+      OBJECT_MAPPER.registerModule(module);
+    }
 
     abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
 
-    abstract long getMaxBatchSize();
+    abstract Write.@Nullable FieldValueExtractFn getIdFn();
 
-    abstract long getMaxBatchSizeBytes();
+    abstract Write.@Nullable FieldValueExtractFn getIndexFn();
 
-    abstract @Nullable FieldValueExtractFn getIdFn();
+    abstract Write.@Nullable FieldValueExtractFn getRoutingFn();

Review comment:
       good addition !

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -303,6 +341,73 @@ public static ConnectionConfiguration create(String[] 
addresses, String index, S
           .build();
     }
 
+    /**
+     * Creates a new Elasticsearch connection configuration with no default 
type.
+     *
+     * @param addresses list of addresses of Elasticsearch nodes
+     * @param index the index toward which the requests will be issued
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String[] addresses, String 
index) {
+      checkArgument(addresses != null, "addresses can not be null");
+      checkArgument(addresses.length > 0, "addresses can not be empty");
+      checkArgument(index != null, "index can not be null");
+      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+          .setAddresses(Arrays.asList(addresses))
+          .setIndex(index)
+          .setType("")
+          .setTrustSelfSignedCerts(false)
+          .build();
+    }
+
+    /**
+     * Creates a new Elasticsearch connection configuration with no default 
index nor type.
+     *
+     * @param addresses list of addresses of Elasticsearch nodes
+     * @return the connection configuration object
+     */
+    public static ConnectionConfiguration create(String[] addresses) {
+      checkArgument(addresses != null, "addresses can not be null");
+      checkArgument(addresses.length > 0, "addresses can not be empty");
+      return new AutoValue_ElasticsearchIO_ConnectionConfiguration.Builder()
+          .setAddresses(Arrays.asList(addresses))
+          .setIndex("")
+          .setType("")
+          .setTrustSelfSignedCerts(false)
+          .build();
+    }
+
+    /**
+     * Generates the bulk API endpoint based on the set values.
+     *
+     * <p>Based on ConnectionConfiguration constructors, we know that one of 
the following is true:
+     *
+     * <ul>
+     *   <li>index and type are non-empty strings
+     *   <li>index is non-empty string, type is empty string
+     *   <li>index and type are empty string
+     * </ul>
+     *
+     * <p>Valid endpoints therefore include:
+     *
+     * <ul>
+     *   <li>/_bulk
+     *   <li>/index_name/_bulk
+     *   <li>/index_name/type_name/_bulk
+     * </ul>
+     */
+    public String getBulkEndPoint() {
+      List<String> endPointComponents = Arrays.asList(getIndex(), getType(), 
"_bulk");

Review comment:
       This piece of code looks strange to me. I would prefer a more readable 
impl like this:
   ```suggestion
         StringBuilder sb = new StringBuilder();
         if (!Strings.isNullOrEmpty(getIndex())){
           sb.append("/").append(getIndex());
         }
         if (!Strings.isNullOrEmpty(getType())){
           sb.append("/").append(getType());
         }
         sb.append("/").append("_bulk");
         return sb.toString();
   
   ```

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1030,115 +1135,92 @@ public boolean test(HttpEntity responseEntity) {
     }
   }
 
-  /** A {@link PTransform} writing data to Elasticsearch. */
+  /** A {@link PTransform} converting docs to their Bulk API counterparts. */
   @AutoValue
-  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+  public abstract static class DocToBulk
+      extends PTransform<PCollection<String>, PCollection<String>> {
 
-    /**
-     * Interface allowing a specific field value to be returned from a parsed 
JSON document. This is
-     * used for using explicit document ids, and for dynamic routing 
(index/Type) on a document
-     * basis. A null response will result in default behaviour and an 
exception will be propagated
-     * as a failure.
-     */
-    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race conditions 
on updates
 
-    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+    static {
+      SimpleModule module = new SimpleModule();
+      module.addSerializer(DocumentMetadata.class, new 
DocumentMetadataSerializer());
+      OBJECT_MAPPER.registerModule(module);
+    }
 
     abstract @Nullable ConnectionConfiguration getConnectionConfiguration();
 
-    abstract long getMaxBatchSize();
+    abstract Write.@Nullable FieldValueExtractFn getIdFn();
 
-    abstract long getMaxBatchSizeBytes();
+    abstract Write.@Nullable FieldValueExtractFn getIndexFn();
 
-    abstract @Nullable FieldValueExtractFn getIdFn();
+    abstract Write.@Nullable FieldValueExtractFn getRoutingFn();
 
-    abstract @Nullable FieldValueExtractFn getIndexFn();
+    abstract Write.@Nullable FieldValueExtractFn getTypeFn();
 
-    abstract @Nullable FieldValueExtractFn getTypeFn();
+    abstract Write.@Nullable FieldValueExtractFn getDocVersionFn();
 
-    abstract @Nullable RetryConfiguration getRetryConfiguration();
+    abstract @Nullable String getDocVersionType();
 
-    abstract boolean getUsePartialUpdate();
+    abstract @Nullable String getUpsertScript();

Review comment:
       good addition !

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1272,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {

Review comment:
       as _backendVersion_ is optional it looks good to me but what if set 
version and actual version mismatch ?

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1269,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {
+          backendVersion = 
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+        } else {
+          backendVersion = spec.getBackendVersion();
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} convenience wrapper for doing both document to bulk 
API serialization as

Review comment:
       With the separation, this transform will still be the entry point for 
almost all the users. So I would copy the javadoc of original Write transform 
there. And add this javadoc as details with a sentence such as "in fact, it is 
a convinience ...."

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1203,88 +1796,160 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+    public BulkIO withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or 
delete from the document
-     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the 
withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API 
response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false 
to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API 
response
+     * items[].error.type field. Any element in @param 
allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return 
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set 
a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk 
API request. If this
+     * config is not set, Bulk requests will not be issued until {@link 
BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher 
latency in particular if
+     * your max batch size is set to a large value and your pipeline input is 
low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending 
any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires 
withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have 
the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default 
without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize 
entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be 
as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing 
to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. If data is globally windowed 
and this
+     * configuration is set to 1, there will only ever be 1 request in flight. 
Having only a single
+     * request in flight can be beneficial for ensuring an Elasticsearch 
cluster is not overwhelmed
+     * by parallel requests, but may not work for all use cases. If this 
number is less than the
+     * number of maximum workers in your pipeline, the IO work may not be 
distributed across all
+     * workers.
+     *
+     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
+     *     of data
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     */
+    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+      checkArgument(
+          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
+      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-      FieldValueExtractFn idFn = getIdFn();
-      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
-      checkArgument(
-          isDeleteFn == null || idFn != null,
-          "Id needs to be specified by withIdFn for delete operation");
-      input.apply(ParDo.of(new WriteFn(this)));
+
+      if (getUseStatefulBatches()) {
+        GroupIntoBatches<Integer, String> groupIntoBatches =
+            GroupIntoBatches.ofSize(getMaxBatchSize());
+
+        if (getMaxBufferingDuration() != null) {
+          groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(getMaxBufferingDuration());
+        }
+        input
+            .apply(ParDo.of(new 
AssignShardFn<>(getMaxParallelRequestsPerWindow())))

Review comment:
       It is true that you need to provide a ` PCollection<KV>` to use 
`GroupIntoBatches` (because inner state in GIB is per key). Also true that you 
cannot use runner.parallelism to know the number of workers because it would be 
a leak from the runner to the SDK which is forbidden. But using 
`AssignShardFn<>(getMaxParallelRequestsPerWindow())` seems risky for 
parallelism because the key will be an int modulo 
`MaxParallelRequestsPerWindow`. If `MaxParallelRequestsPerWindow` set by the 
user is very low for example 1, then the key will be 0 or 1. And runners such 
as spark will do object.hascode() to determin to which spark partition the KV 
should go. I fear that it reduces the parallelism to 2 in that case. But I 
think you mentioned something similar in the javadoc. Nevertheless, I think 
forcing the backend engine to reduce concurrency that way is the only way you 
have to control ES requests concurrency (because no central point in parallel 
systems) so I think you need to put a big war
 ning for users so that they are aware that using that will reduce concurrency 
on the last write step of their pipeline: runners will maintain engine 
configured concurrency starting at the source until the end (if no dynamic 
repartitioning) but there is very good chance that some partitions will be 
empty in final write step after setting MaxParallelRequestsPerWindow = very 
low. 

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1203,88 +1796,160 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+    public BulkIO withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or 
delete from the document
-     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the 
withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API 
response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false 
to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API 
response
+     * items[].error.type field. Any element in @param 
allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return 
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set 
a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk 
API request. If this
+     * config is not set, Bulk requests will not be issued until {@link 
BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher 
latency in particular if
+     * your max batch size is set to a large value and your pipeline input is 
low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending 
any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires 
withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have 
the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default 
without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize 
entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be 
as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing 
to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. If data is globally windowed 
and this
+     * configuration is set to 1, there will only ever be 1 request in flight. 
Having only a single
+     * request in flight can be beneficial for ensuring an Elasticsearch 
cluster is not overwhelmed
+     * by parallel requests, but may not work for all use cases. If this 
number is less than the
+     * number of maximum workers in your pipeline, the IO work may not be 
distributed across all
+     * workers.
+     *
+     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
+     *     of data
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     */
+    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+      checkArgument(
+          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
+      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-      FieldValueExtractFn idFn = getIdFn();
-      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
-      checkArgument(
-          isDeleteFn == null || idFn != null,
-          "Id needs to be specified by withIdFn for delete operation");
-      input.apply(ParDo.of(new WriteFn(this)));
+
+      if (getUseStatefulBatches()) {
+        GroupIntoBatches<Integer, String> groupIntoBatches =

Review comment:
       once again, nice feature to avoid tiny bulks !

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1203,88 +1796,160 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+    public BulkIO withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or 
delete from the document
-     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the 
withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API 
response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false 
to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API 
response
+     * items[].error.type field. Any element in @param 
allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return 
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set 
a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk 
API request. If this
+     * config is not set, Bulk requests will not be issued until {@link 
BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher 
latency in particular if
+     * your max batch size is set to a large value and your pipeline input is 
low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending 
any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires 
withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have 
the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default 
without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize 
entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be 
as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing 
to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. If data is globally windowed 
and this
+     * configuration is set to 1, there will only ever be 1 request in flight. 
Having only a single
+     * request in flight can be beneficial for ensuring an Elasticsearch 
cluster is not overwhelmed
+     * by parallel requests, but may not work for all use cases. If this 
number is less than the
+     * number of maximum workers in your pipeline, the IO work may not be 
distributed across all
+     * workers.
+     *
+     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
+     *     of data
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     */
+    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+      checkArgument(
+          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
+      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-      FieldValueExtractFn idFn = getIdFn();
-      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
-      checkArgument(
-          isDeleteFn == null || idFn != null,
-          "Id needs to be specified by withIdFn for delete operation");
-      input.apply(ParDo.of(new WriteFn(this)));
+
+      if (getUseStatefulBatches()) {
+        GroupIntoBatches<Integer, String> groupIntoBatches =
+            GroupIntoBatches.ofSize(getMaxBatchSize());
+
+        if (getMaxBufferingDuration() != null) {
+          groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(getMaxBufferingDuration());
+        }
+        input
+            .apply(ParDo.of(new 
AssignShardFn<>(getMaxParallelRequestsPerWindow())))
+            .apply(groupIntoBatches)
+            .apply(
+                "Remove key no longer needed",
+                
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))
+                    .via(KV::getValue))
+            .apply(ParDo.of(new BulkIOFn(this)));
+      } else {
+
+        input
+            .apply(
+                "Make elements iterable",
+                
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))

Review comment:
       I know why you do this: this is because you want BulkIOFn to have the 
same signature when used with GroupIntoBatches and without so you need BulkIOFn 
to take an Iterable as input because GIB produces an Iterable as output. I'm 
not a big fan of extra steps just to be able to reuse code. Maybe it would be 
better to have a BulkIOFn impl that takes a single String and another impl that 
takes Iterable<String> (for outputs of GIB case) that relies on the first one.

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -265,6 +275,13 @@ void testWriteWithErrors() throws Exception {
     List<String> input =
         ElasticsearchIOTestUtils.createDocuments(
             numDocs, 
ElasticsearchIOTestUtils.InjectionMode.INJECT_SOME_INVALID_DOCS);
+

Review comment:
       duplicated

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
##########
@@ -156,6 +156,16 @@ public void testWriteWithErrors() throws Exception {
     elasticsearchIOTestCommon.testWriteWithErrors();
   }
 
+  @Test
+  public void testWriteWithAllowableErrors() throws Exception {

Review comment:
       it is a left over I think, it is the same as below

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1269,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {
+          backendVersion = 
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+        } else {
+          backendVersion = spec.getBackendVersion();
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} convenience wrapper for doing both document to bulk 
API serialization as
+   * well as batching those Bulk API entities and writing them to an 
Elasticsearch cluster. This
+   * class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for 
convenience and backward
+   * compatibility.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+
+    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+
+    public abstract DocToBulk getDocToBulk();
+
+    public abstract BulkIO getBulkIO();
+
+    abstract Builder writeBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {

Review comment:
       I don't think a builder pattern is needed here as it is just to store 
BulkIO and DocToBulk fields and the user will never have to set these fields. I 
would just store the fields and leave the builder proxy to DocToBulk and 
BulkIO. In addition, it would avoid having to call the getters each time.

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1453,14 +2062,23 @@ private HttpEntity handleRetry(
         // while retry policy exists
         while (BackOffUtils.next(sleeper, backoff)) {
           LOG.warn(String.format(RETRY_ATTEMPT_LOG, ++attempt));
-          Request request = new Request(method, endpoint);
-          request.addParameters(params);
-          request.setEntity(requestBody);
-          response = restClient.performRequest(request);
-          responseEntity = new BufferedHttpEntity(response.getEntity());
+          try {
+            Request request = new Request(method, endpoint);
+            request.addParameters(params);
+            request.setEntity(requestBody);
+            response = restClient.performRequest(request);
+            responseEntity = new BufferedHttpEntity(response.getEntity());
+          } catch (java.io.IOException ex) {

Review comment:
       same here

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1310,135 +1970,84 @@ public void startBundle(StartBundleContext context) {
         currentBatchSizeBytes = 0;
       }
 
-      private class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
-
-        private DocumentMetadataSerializer() {
-          super(DocumentMetadata.class);
-        }
-
-        @Override
-        public void serialize(
-            DocumentMetadata value, JsonGenerator gen, SerializerProvider 
provider)
-            throws IOException {
-          gen.writeStartObject();
-          if (value.index != null) {
-            gen.writeStringField("_index", value.index);
-          }
-          if (value.type != null) {
-            gen.writeStringField("_type", value.type);
-          }
-          if (value.id != null) {
-            gen.writeStringField("_id", value.id);
-          }
-          if (value.retryOnConflict != null && (backendVersion <= 6)) {
-            gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
-          }
-          if (value.retryOnConflict != null && backendVersion >= 7) {
-            gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
-          }
-          gen.writeEndObject();
-        }
-      }
-      /**
-       * Extracts the components that comprise the document address from the 
document using the
-       * {@link FieldValueExtractFn} configured. This allows any or all of the 
index, type and
-       * document id to be controlled on a per document basis. Sanitization of 
the index is
-       * performed, automatically lower-casing the value as required by 
Elasticsearch.
-       *
-       * @param parsedDocument the json from which the index, type and id may 
be extracted
-       * @return the document address as JSON or the default
-       * @throws IOException if the document cannot be parsed as JSON
-       */
-      private String getDocumentMetadata(JsonNode parsedDocument) throws 
IOException {
-        DocumentMetadata metadata =
-            new DocumentMetadata(
-                spec.getIndexFn() != null
-                    ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
-                    : null,
-                spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
-                spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) 
: null,
-                spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT : null);
-        return OBJECT_MAPPER.writeValueAsString(metadata);
-      }
-
-      private static String lowerCaseOrNull(String input) {
-        return input == null ? null : input.toLowerCase();
+      @FinishBundle
+      public void finishBundle(FinishBundleContext context)
+          throws IOException, InterruptedException {
+        flushBatch();
       }
 
       @ProcessElement
-      public void processElement(ProcessContext context) throws Exception {
-        String document = context.element(); // use configuration and 
auto-generated document IDs
-        String documentMetadata = "{}";
-        boolean isDelete = false;
-        if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
-          // parse once and reused for efficiency
-          JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
-          documentMetadata = getDocumentMetadata(parsedDocument);
-          if (spec.getIsDeleteFn() != null) {
-            isDelete = spec.getIsDeleteFn().apply(parsedDocument);
-          }
+      public void processElement(@Element @NonNull Iterable<String> 
bulkApiEntities)
+          throws Exception {
+        for (String bulkApiEntity : bulkApiEntities) {
+          addAndMaybeFlush(bulkApiEntity);
         }
+      }
 
-        if (isDelete) {
-          // delete request used for deleting a document.
-          batch.add(String.format("{ \"delete\" : %s }%n", documentMetadata));
-        } else {
-          // index is an insert/upsert and update is a partial update (or 
insert if not existing)
-          if (spec.getUsePartialUpdate()) {
-            batch.add(
-                String.format(
-                    "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" : 
true }%n",
-                    documentMetadata, document));
-          } else {
-            batch.add(String.format("{ \"index\" : %s }%n%s%n", 
documentMetadata, document));
-          }
-        }
+      protected void addAndMaybeFlush(String bulkApiEntity)
+          throws IOException, InterruptedException {
+        batch.add(bulkApiEntity);
+        currentBatchSizeBytes += 
bulkApiEntity.getBytes(StandardCharsets.UTF_8).length;
 
-        currentBatchSizeBytes += 
document.getBytes(StandardCharsets.UTF_8).length;
         if (batch.size() >= spec.getMaxBatchSize()
             || currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
           flushBatch();
         }
       }
 
-      @FinishBundle
-      public void finishBundle(FinishBundleContext context)
-          throws IOException, InterruptedException {
-        flushBatch();
-      }
-
       private void flushBatch() throws IOException, InterruptedException {
         if (batch.isEmpty()) {
           return;
         }
+
+        LOG.info(
+            "ElasticsearchIO batch size: {}, batch size bytes: {}",
+            batch.size(),
+            currentBatchSizeBytes);
+
         StringBuilder bulkRequest = new StringBuilder();
         for (String json : batch) {
           bulkRequest.append(json);
         }
+
         batch.clear();
-        currentBatchSizeBytes = 0;
-        Response response;
-        HttpEntity responseEntity;
-        // Elasticsearch will default to the index/type provided here if none 
are set in the
-        // document meta (i.e. using ElasticsearchIO$Write#withIndexFn and
-        // ElasticsearchIO$Write#withTypeFn options)
-        String endPoint =
-            String.format(
-                "/%s/%s/_bulk",
-                spec.getConnectionConfiguration().getIndex(),
-                spec.getConnectionConfiguration().getType());
+        currentBatchSizeBytes = 0L;
+
+        Response response = null;
+        HttpEntity responseEntity = null;
+
+        // Elasticsearch will default to the index/type provided the {@link
+        // ConnectionConfiguration} if none are set in the document meta (i.e.
+        // using ElasticsearchIO$DocToBulk#withIndexFn and
+        // ElasticsearchIO$DocToBulk#withTypeFn options)
+        String endPoint = spec.getConnectionConfiguration().getBulkEndPoint();
+
         HttpEntity requestBody =
             new NStringEntity(bulkRequest.toString(), 
ContentType.APPLICATION_JSON);
-        Request request = new Request("POST", endPoint);
-        request.addParameters(Collections.emptyMap());
-        request.setEntity(requestBody);
-        response = restClient.performRequest(request);
-        responseEntity = new BufferedHttpEntity(response.getEntity());
+        try {
+          Request request = new Request("POST", endPoint);
+          request.addParameters(Collections.emptyMap());
+          request.setEntity(requestBody);
+          response = restClient.performRequest(request);
+          responseEntity = new BufferedHttpEntity(response.getEntity());
+        } catch (java.io.IOException ex) {

Review comment:
       Before we just threw the exception and there would be retrials only on 
http 429 (predicate). Now retrials are also done when receiving IOException. 
Are you sure all IOException cases can be retried ? I'm not sure they are all 
timeouts: a misconfigured IO will throw IOException and will be retried. It is 
good to retry on Timeouts IMHO but please filter on only timeouts.

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1177,12 +1269,513 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
      * Elasticsearch.
      *
      * @param usePartialUpdate set to true to issue partial updates
-     * @return the {@link Write} with the partial update control set
+     * @return the {@link DocToBulk} with the partial update control set
      */
-    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+    public DocToBulk withUsePartialUpdate(boolean usePartialUpdate) {
       return builder().setUsePartialUpdate(usePartialUpdate).build();
     }
 
+    /**
+     * Whether to use scripted updates and what script to use.
+     *
+     * @param source set to the value of the script source, painless lang
+     * @return the {@link DocToBulk} with the scripted updates set
+     */
+    public DocToBulk withUpsertScript(String source) {
+      return 
builder().setUsePartialUpdate(false).setUpsertScript(source).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionFn to extract the document version
+     * @return the {@link DocToBulk} with the function set
+     */
+    public DocToBulk withDocVersionFn(Write.FieldValueExtractFn docVersionFn) {
+      checkArgument(docVersionFn != null, "docVersionFn must not be null");
+      return builder().setDocVersionFn(docVersionFn).build();
+    }
+
+    /**
+     * Provide a function to extract the target operation either upsert or 
delete from the document
+     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
+     * taken care that the document's id extraction is defined using the 
withIdFn function or else
+     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
+     * will fail and the exception propagated.
+     *
+     * @param isDeleteFn set to true for deleting the specific document
+     * @return the {@link Write} with the function set
+     */
+    public DocToBulk withIsDeleteFn(Write.BooleanFieldValueExtractFn 
isDeleteFn) {
+      checkArgument(isDeleteFn != null, "deleteFn is required");
+      return builder().setIsDeleteFn(isDeleteFn).build();
+    }
+
+    /**
+     * Provide a function to extract the doc version from the document. This 
version number will be
+     * used as the document version in Elasticsearch. Should the function 
throw an Exception then
+     * the batch will fail and the exception propagated. Incompatible with 
update operations and
+     * should only be used with withUsePartialUpdate(false)
+     *
+     * @param docVersionType the version type to use, one of {@value 
ElasticsearchIO#VERSION_TYPES}
+     * @return the {@link DocToBulk} with the doc version type set
+     */
+    public DocToBulk withDocVersionType(String docVersionType) {
+      checkArgument(
+          VERSION_TYPES.contains(docVersionType),
+          "docVersionType must be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setDocVersionType(docVersionType).build();
+    }
+
+    /**
+     * Use to set explicitly which version of Elasticsearch the destination 
cluster is running.
+     * Providing this hint means there is no need for setting {@link
+     * DocToBulk#withConnectionConfiguration}. This can also be very useful 
for testing purposes.
+     *
+     * @param backendVersion the major version number of the version of 
Elasticsearch being run in
+     *     the cluster where documents will be indexed.
+     * @return the {@link DocToBulk} with the Elasticsearch major version 
number set
+     */
+    public DocToBulk withBackendVersion(int backendVersion) {
+      checkArgument(
+          VALID_CLUSTER_VERSIONS.contains(backendVersion),
+          "Backend version may only be one of " + "%s",
+          String.join(", ", VERSION_TYPES));
+      return builder().setBackendVersion(backendVersion).build();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<String> docs) {
+      ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
+      Integer backendVersion = getBackendVersion();
+      Write.FieldValueExtractFn idFn = getIdFn();
+      Write.BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+      checkState(
+          (backendVersion != null || connectionConfiguration != null),
+          "withBackendVersion() or withConnectionConfiguration() is required");
+      checkArgument(
+          isDeleteFn == null || idFn != null,
+          "Id needs to be specified by withIdFn for delete operation");
+
+      return docs.apply(ParDo.of(new DocToBulkFn(this)));
+    }
+
+    // Encapsulates the elements which form the metadata for an Elasticsearch 
bulk operation
+    private static class DocumentMetadata implements Serializable {
+      final String index;
+      final String type;
+      final String id;
+      final Integer retryOnConflict;
+      final String routing;
+      final Integer backendVersion;
+      final String version;
+      final String versionType;
+
+      DocumentMetadata(
+          String index,
+          String type,
+          String id,
+          Integer retryOnConflict,
+          String routing,
+          Integer backendVersion,
+          String version,
+          String versionType) {
+        this.index = index;
+        this.id = id;
+        this.type = type;
+        this.retryOnConflict = retryOnConflict;
+        this.routing = routing;
+        this.backendVersion = backendVersion;
+        this.version = version;
+        this.versionType = versionType;
+      }
+    }
+
+    private static class DocumentMetadataSerializer extends 
StdSerializer<DocumentMetadata> {
+      private DocumentMetadataSerializer() {
+        super(DocumentMetadata.class);
+      }
+
+      @Override
+      public void serialize(DocumentMetadata value, JsonGenerator gen, 
SerializerProvider provider)
+          throws IOException {
+        gen.writeStartObject();
+        if (value.index != null) {
+          gen.writeStringField("_index", value.index);
+        }
+        if (value.type != null) {
+          gen.writeStringField("_type", value.type);
+        }
+        if (value.id != null) {
+          gen.writeStringField("_id", value.id);
+        }
+        if (value.routing != null) {
+          gen.writeStringField("routing", value.routing);
+        }
+        if (value.retryOnConflict != null && value.backendVersion <= 6) {
+          gen.writeNumberField("_retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.retryOnConflict != null && value.backendVersion >= 7) {
+          gen.writeNumberField("retry_on_conflict", value.retryOnConflict);
+        }
+        if (value.version != null) {
+          gen.writeStringField("version", value.version);
+        }
+        if (value.versionType != null) {
+          gen.writeStringField("version_type", value.versionType);
+        }
+        gen.writeEndObject();
+      }
+    }
+
+    @VisibleForTesting
+    static String createBulkApiEntity(DocToBulk spec, String document, int 
backendVersion)
+        throws IOException {
+      String documentMetadata = "{}";
+      boolean isDelete = false;
+      if (spec.getIndexFn() != null || spec.getTypeFn() != null || 
spec.getIdFn() != null) {
+        // parse once and reused for efficiency
+        JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
+        documentMetadata = getDocumentMetadata(spec, parsedDocument, 
backendVersion);
+        if (spec.getIsDeleteFn() != null) {
+          isDelete = spec.getIsDeleteFn().apply(parsedDocument);
+        }
+      }
+
+      if (isDelete) {
+        // delete request used for deleting a document
+        return String.format("{ \"delete\" : %s }%n", documentMetadata);
+      } else {
+        // index is an insert/upsert and update is a partial update (or insert 
if not
+        // existing)
+        if (spec.getUsePartialUpdate()) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"doc\" : %s, " + "\"doc_as_upsert\" : 
true }%n",
+              documentMetadata, document);
+        } else if (spec.getUpsertScript() != null) {
+          return String.format(
+              "{ \"update\" : %s }%n{ \"script\" : {\"source\": \"%s\", "
+                  + "\"params\": %s}, \"upsert\" : %s }%n",
+              documentMetadata, spec.getUpsertScript(), document, document);
+        } else {
+          return String.format("{ \"index\" : %s }%n%s%n", documentMetadata, 
document);
+        }
+      }
+    }
+
+    private static String lowerCaseOrNull(String input) {
+      return input == null ? null : input.toLowerCase();
+    }
+
+    /**
+     * Extracts the components that comprise the document address from the 
document using the {@link
+     * Write.FieldValueExtractFn} configured. This allows any or all of the 
index, type and document
+     * id to be controlled on a per document basis. If none are provided then 
an empty default of
+     * {@code {}} is returned. Sanitization of the index is performed, 
automatically lower-casing
+     * the value as required by Elasticsearch.
+     *
+     * @param parsedDocument the json from which the index, type and id may be 
extracted
+     * @return the document address as JSON or the default
+     * @throws IOException if the document cannot be parsed as JSON
+     */
+    private static String getDocumentMetadata(
+        DocToBulk spec, JsonNode parsedDocument, int backendVersion) throws 
IOException {
+      DocumentMetadata metadata =
+          new DocumentMetadata(
+              spec.getIndexFn() != null
+                  ? lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
+                  : null,
+              spec.getTypeFn() != null ? 
spec.getTypeFn().apply(parsedDocument) : null,
+              spec.getIdFn() != null ? spec.getIdFn().apply(parsedDocument) : 
null,
+              (spec.getUsePartialUpdate()
+                      || (spec.getUpsertScript() != null && 
!spec.getUpsertScript().isEmpty()))
+                  ? DEFAULT_RETRY_ON_CONFLICT
+                  : null,
+              spec.getRoutingFn() != null ? 
spec.getRoutingFn().apply(parsedDocument) : null,
+              backendVersion,
+              spec.getDocVersionFn() != null ? 
spec.getDocVersionFn().apply(parsedDocument) : null,
+              spec.getDocVersionType());
+      return OBJECT_MAPPER.writeValueAsString(metadata);
+    }
+
+    /** {@link DoFn} to for the {@link DocToBulk} transform. */
+    @VisibleForTesting
+    static class DocToBulkFn extends DoFn<String, String> {
+      private final DocToBulk spec;
+      private int backendVersion;
+
+      public DocToBulkFn(DocToBulk spec) {
+        this.spec = spec;
+      }
+
+      @Setup
+      public void setup() throws IOException {
+        ConnectionConfiguration connectionConfiguration = 
spec.getConnectionConfiguration();
+        if (spec.getBackendVersion() == null) {
+          backendVersion = 
ElasticsearchIO.getBackendVersion(connectionConfiguration);
+        } else {
+          backendVersion = spec.getBackendVersion();
+        }
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws IOException {
+        c.output(createBulkApiEntity(spec, c.element(), backendVersion));
+      }
+    }
+  }
+
+  /**
+   * A {@link PTransform} convenience wrapper for doing both document to bulk 
API serialization as
+   * well as batching those Bulk API entities and writing them to an 
Elasticsearch cluster. This
+   * class is effectively a thin proxy for DocToBulk->BulkIO all-in-one for 
convenience and backward
+   * compatibility.
+   */
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, 
PDone> {
+    public interface FieldValueExtractFn extends 
SerializableFunction<JsonNode, String> {}
+
+    public interface BooleanFieldValueExtractFn extends 
SerializableFunction<JsonNode, Boolean> {}
+
+    public abstract DocToBulk getDocToBulk();
+
+    public abstract BulkIO getBulkIO();
+
+    abstract Builder writeBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setDocToBulk(DocToBulk docToBulk);
+
+      abstract Builder setBulkIO(BulkIO bulkIO);
+
+      abstract Write build();
+    }
+
+    // For building Doc2Bulk
+    /** Refer to {@link DocToBulk#withIdFn}. */
+    public Write withIdFn(FieldValueExtractFn idFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIdFn(idFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIndexFn}. */
+    public Write withIndexFn(FieldValueExtractFn indexFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIndexFn(indexFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withRoutingFn}. */
+    public Write withRoutingFn(FieldValueExtractFn routingFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withRoutingFn(routingFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withTypeFn}. */
+    public Write withTypeFn(FieldValueExtractFn typeFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withTypeFn(typeFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionFn}. */
+    public Write withDocVersionFn(FieldValueExtractFn docVersionFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionFn(docVersionFn)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withDocVersionType}. */
+    public Write withDocVersionType(String docVersionType) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withDocVersionType(docVersionType)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withUsePartialUpdate}. */
+    public Write withUsePartialUpdate(boolean usePartialUpdate) {
+      return writeBuilder()
+          .setDocToBulk(getDocToBulk().withUsePartialUpdate(usePartialUpdate))
+          .build();
+    }
+
+    /** Refer to {@link DocToBulk#withUpsertScript}. */
+    public Write withUpsertScript(String source) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withUpsertScript(source)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withBackendVersion}. */
+    public Write withBackendVersion(int backendVersion) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withBackendVersion(backendVersion)).build();
+    }
+
+    /** Refer to {@link DocToBulk#withIsDeleteFn}. */
+    public Write withIsDeleteFn(Write.BooleanFieldValueExtractFn isDeleteFn) {
+      return 
writeBuilder().setDocToBulk(getDocToBulk().withIsDeleteFn(isDeleteFn)).build();
+    }
+    // End building Doc2Bulk
+
+    /** Refer to {@link BulkIO#withConnectionConfiguration}. */
+    public Write withConnectionConfiguration(ConnectionConfiguration 
connectionConfiguration) {
+      checkArgument(connectionConfiguration != null, "connectionConfiguration 
can not be null");
+
+      return writeBuilder()
+          
.setDocToBulk(getDocToBulk().withConnectionConfiguration(connectionConfiguration))
+          
.setBulkIO(getBulkIO().withConnectionConfiguration(connectionConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSize}. */
+    public Write withMaxBatchSize(long batchSize) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSize(batchSize)).build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBatchSizeBytes}. */
+    public Write withMaxBatchSizeBytes(long batchSizeBytes) {
+      return 
writeBuilder().setBulkIO(getBulkIO().withMaxBatchSizeBytes(batchSizeBytes)).build();
+    }
+
+    /** Refer to {@link BulkIO#withRetryConfiguration}. */
+    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withRetryConfiguration(retryConfiguration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withIgnoreVersionConflicts}. */
+    public Write withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withIgnoreVersionConflicts(ignoreVersionConflicts))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withUseStatefulBatches}. */
+    public Write withUseStatefulBatches(boolean useStatefulBatches) {
+      return writeBuilder()
+          .setBulkIO(getBulkIO().withUseStatefulBatches(useStatefulBatches))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxBufferingDuration}. */
+    public Write withMaxBufferingDuration(Duration maxBufferingDuration) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxBufferingDuration(maxBufferingDuration))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withMaxParallelRequestsPerWindow}. */
+    public Write withMaxParallelRquestsPerWindow(int 
maxParallelRquestsPerWindow) {
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withMaxParallelRequestsPerWindow(maxParallelRquestsPerWindow))
+          .build();
+    }
+
+    /** Refer to {@link BulkIO#withAllowableResponseErrors}. */
+    public Write withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrors) {
+      if (allowableResponseErrors == null) {
+        allowableResponseErrors = new HashSet<>();
+      }
+
+      return writeBuilder()
+          
.setBulkIO(getBulkIO().withAllowableResponseErrors(allowableResponseErrors))
+          .build();
+    }
+
+    @Override
+    public PDone expand(PCollection<String> input) {
+      input.apply(getDocToBulk()).apply(getBulkIO());
+      return PDone.in(input.getPipeline());
+    }
+  }
+
+  /** A {@link PTransform} writing data to Elasticsearch. */

Review comment:
       I think we should make it clear for the user here that it is an internal 
transformation (Write is still the main transform entry point). We should also 
describe that it takes results of bulk serialization entities (bulkApiEntity) 
as input otherwise the user will be either confused or mislead to use this 
transform to write his simple json documents.

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -297,16 +342,27 @@ void testWriteWithMaxBatchSize() throws Exception {
         ElasticsearchIO.write()
             .withConnectionConfiguration(connectionConfiguration)
             .withMaxBatchSize(BATCH_SIZE);
+
     // write bundles size is the runner decision, we cannot force a bundle 
size,
     // so we test the Writer as a DoFn outside of a runner.
-    try (DoFnTester<String, Void> fnTester = DoFnTester.of(new 
Write.WriteFn(write))) {
+    try (DoFnTester<Iterable<String>, Void> fnTester =
+        DoFnTester.of(new BulkIO.BulkIOFn(write.getBulkIO()))) {
       List<String> input =
           ElasticsearchIOTestUtils.createDocuments(
               numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+      List<String> serializedInput = new ArrayList<>();
+      for (String doc : input) {
+        serializedInput.add(
+            DocToBulk.createBulkApiEntity(
+                write.getDocToBulk(), doc, 
getBackendVersion(connectionConfiguration)));
+      }
       long numDocsProcessed = 0;
       long numDocsInserted = 0;
-      for (String document : input) {
-        fnTester.processElement(document);
+      for (String document : serializedInput) {
+        // It's a tad strange to iterate over a list and then make a list of 
each element, but

Review comment:
       cf my comment in production code: please provide a BulkIOFn that uses 
`String` and not `Iterable<String>`

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1203,88 +1796,160 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+    public BulkIO withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or 
delete from the document
-     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the 
withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API 
response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false 
to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API 
response
+     * items[].error.type field. Any element in @param 
allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return 
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set 
a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk 
API request. If this
+     * config is not set, Bulk requests will not be issued until {@link 
BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher 
latency in particular if
+     * your max batch size is set to a large value and your pipeline input is 
low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending 
any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires 
withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have 
the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default 
without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize 
entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be 
as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing 
to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. If data is globally windowed 
and this
+     * configuration is set to 1, there will only ever be 1 request in flight. 
Having only a single
+     * request in flight can be beneficial for ensuring an Elasticsearch 
cluster is not overwhelmed
+     * by parallel requests, but may not work for all use cases. If this 
number is less than the
+     * number of maximum workers in your pipeline, the IO work may not be 
distributed across all
+     * workers.
+     *
+     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
+     *     of data
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     */
+    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+      checkArgument(
+          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
+      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-      FieldValueExtractFn idFn = getIdFn();
-      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
-      checkArgument(
-          isDeleteFn == null || idFn != null,
-          "Id needs to be specified by withIdFn for delete operation");
-      input.apply(ParDo.of(new WriteFn(this)));
+
+      if (getUseStatefulBatches()) {
+        GroupIntoBatches<Integer, String> groupIntoBatches =
+            GroupIntoBatches.ofSize(getMaxBatchSize());
+
+        if (getMaxBufferingDuration() != null) {
+          groupIntoBatches = 
groupIntoBatches.withMaxBufferingDuration(getMaxBufferingDuration());
+        }
+        input
+            .apply(ParDo.of(new 
AssignShardFn<>(getMaxParallelRequestsPerWindow())))
+            .apply(groupIntoBatches)
+            .apply(
+                "Remove key no longer needed",
+                
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))
+                    .via(KV::getValue))
+            .apply(ParDo.of(new BulkIOFn(this)));
+      } else {
+
+        input
+            .apply(
+                "Make elements iterable",
+                
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))
+                    .via(Collections::singletonList))
+            .apply(ParDo.of(new BulkIOFn(this)));
+      }
+
       return PDone.in(input.getPipeline());

Review comment:
       it will be used only with Write which already does the PDone.in

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -257,6 +259,14 @@ void testWrite() throws Exception {
     executeWriteTest(write);
   }
 
+  void testWriteStateful() throws Exception {

Review comment:
       you need to add this test in all ElasticSearchTests.java otherwise they 
it is not tested.

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -257,6 +259,14 @@ void testWrite() throws Exception {
     executeWriteTest(write);
   }
 
+  void testWriteStateful() throws Exception {

Review comment:
       please also add test coverage for new features that are not already 
covered: test upsert, test routing, test doc version, test 
withMaxParallelRquestsPerWindow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to