echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629978211



##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(
+        0,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "3", null, 
KV.of(3, numDocs)));
+  }
+
+  /**
+   * Tests upsert script by adding a group field to each document in the 
standard test set. The
+   * group field is populated as the modulo 2 of the document id allowing for 
a test to ensure the
+   * documents are split into 2 groups.
+   */
+  void testWriteScriptedUpsert() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withUpsertScript(SCRIPT_SOURCE);
+
+    // Test that documents can be inserted/created by using withUpsertScript
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    // defensive coding to ensure our initial state is as expected
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // All docs should have have group = 0 added by the script upon creation
+    assertEquals(
+        numDocs, countByMatch(connectionConfiguration, restClient, "group", 
"0", null, null));
+
+    // Run the same data again. This time, because all docs exist in the index 
already, scripted
+    // updates should happen rather than scripted inserts.
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // The script will set either 0 or 1 for the group value on update 
operations
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testMaxParallelRequestsPerWindow() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Long>> batches =
+        pipeline
+            .apply(Create.of(data))
+            .apply(Window.into(new GlobalWindows()))

Review comment:
       should not be needed: it is the default window when none is specified.

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(
+        0,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "3", null, 
KV.of(3, numDocs)));
+  }
+
+  /**
+   * Tests upsert script by adding a group field to each document in the 
standard test set. The
+   * group field is populated as the modulo 2 of the document id allowing for 
a test to ensure the
+   * documents are split into 2 groups.
+   */
+  void testWriteScriptedUpsert() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withUpsertScript(SCRIPT_SOURCE);
+
+    // Test that documents can be inserted/created by using withUpsertScript
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    // defensive coding to ensure our initial state is as expected
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // All docs should have have group = 0 added by the script upon creation
+    assertEquals(
+        numDocs, countByMatch(connectionConfiguration, restClient, "group", 
"0", null, null));
+
+    // Run the same data again. This time, because all docs exist in the index 
already, scripted
+    // updates should happen rather than scripted inserts.
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // The script will set either 0 or 1 for the group value on update 
operations
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testMaxParallelRequestsPerWindow() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Long>> batches =
+        pipeline
+            .apply(Create.of(data))
+            .apply(Window.into(new GlobalWindows()))
+            .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+            .apply(Count.perKey());
+
+    // Number of unique keys produced should be number of 
maxParallelRequestsPerWindow * numWindows
+    // There is only 1 request (key) per window, and 1 (global) window ie. one 
key total
+    
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0, 
1L)));
+
+    pipeline.run();
+  }
+
+  void testMaxBufferingDurationAndMaxParallelRequestsPerWindow() throws 
Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxBufferingDuration(Duration.standardSeconds(1))
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Long>> batches =
+        pipeline
+            .apply(Create.of(data))
+            .apply(Window.into(new GlobalWindows()))

Review comment:
       ditto

##########
File path: 
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1246,88 +1877,208 @@ public Write withUsePartialUpdate(boolean 
usePartialUpdate) {
      * }</pre>
      *
      * @param retryConfiguration the rules which govern the retry behavior
-     * @return the {@link Write} with retrying configured
+     * @return the {@link BulkIO} with retrying configured
      */
-    public Write withRetryConfiguration(RetryConfiguration retryConfiguration) 
{
+    public BulkIO withRetryConfiguration(RetryConfiguration 
retryConfiguration) {
       checkArgument(retryConfiguration != null, "retryConfiguration is 
required");
       return builder().setRetryConfiguration(retryConfiguration).build();
     }
 
     /**
-     * Provide a function to extract the target operation either upsert or 
delete from the document
-     * fields allowing dynamic bulk operation decision. While using 
withIsDeleteFn, it should be
-     * taken care that the document's id extraction is defined using the 
withIdFn function or else
-     * IllegalArgumentException is thrown. Should the function throw an 
Exception then the batch
-     * will fail and the exception propagated.
+     * Whether or not to suppress version conflict errors in a Bulk API 
response. This can be useful
+     * if your use case involves using external version types.
      *
-     * @param isDeleteFn set to true for deleting the specific document
-     * @return the {@link Write} with the function set
+     * @param ignoreVersionConflicts true to suppress version conflicts, false 
to surface version
+     *     conflict errors.
+     * @return the {@link BulkIO} with version conflict handling configured
      */
-    public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
-      checkArgument(isDeleteFn != null, "deleteFn is required");
-      return builder().setIsDeleteFn(isDeleteFn).build();
+    public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+      Set<String> allowedResponseErrors = getAllowedResponseErrors();
+      if (allowedResponseErrors == null) {
+        allowedResponseErrors = new HashSet<>();
+      }
+      if (ignoreVersionConflicts) {
+        allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+      }
+
+      return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+    }
+
+    /**
+     * Provide a set of textual error types which can be contained in Bulk API 
response
+     * items[].error.type field. Any element in @param 
allowableResponseErrorTypes will suppress
+     * errors of the same type in Bulk responses.
+     *
+     * <p>See also
+     * 
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+     *
+     * @param allowableResponseErrorTypes
+     * @return the {@link BulkIO} with allowable response errors set
+     */
+    public BulkIO withAllowableResponseErrors(@Nullable Set<String> 
allowableResponseErrorTypes) {
+      if (allowableResponseErrorTypes == null) {
+        allowableResponseErrorTypes = new HashSet<>();
+      }
+
+      return 
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+    }
+
+    /**
+     * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set 
a maximum elapsed
+     * time before buffered elements are emitted to Elasticsearch as a Bulk 
API request. If this
+     * config is not set, Bulk requests will not be issued until {@link 
BulkIO#getMaxBatchSize}
+     * number of documents have been buffered. This may result in higher 
latency in particular if
+     * your max batch size is set to a large value and your pipeline input is 
low volume.
+     *
+     * @param maxBufferingDuration the maximum duration to wait before sending 
any buffered
+     *     documents to Elasticsearch, regardless of maxBatchSize.
+     * @return the {@link BulkIO} with maximum buffering duration set
+     */
+    public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+      LOG.warn(
+          "Use of withMaxBufferingDuration requires 
withUseStatefulBatches(true). "
+              + "Setting that automatically.");
+      return builder()
+          .setUseStatefulBatches(true)
+          .setMaxBufferingDuration(maxBufferingDuration)
+          .build();
+    }
+
+    /**
+     * Whether or not to use Stateful Processing to ensure bulk requests have 
the desired number of
+     * entities i.e. as close to the maxBatchSize as possible. By default 
without this feature
+     * enabled, Bulk requests will not contain more than maxBatchSize 
entities, but the lower bound
+     * of batch size is determined by Beam Runner bundle sizes, which may be 
as few as 1.
+     *
+     * @param useStatefulBatches true enables the use of Stateful Processing 
to ensure that batches
+     *     are as close to the maxBatchSize as possible.
+     * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+     */
+    public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+      return builder().setUseStatefulBatches(useStatefulBatches).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
+     * can reduce parallelism greatly. If data is globally windowed and @param
+     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
+     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
+     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
+     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
+     * sub-distribution of the last write step with most of the runners.
+     *
+     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
+     *     of data
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     */
+    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+      checkArgument(
+          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
+      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+    }
+
+    /**
+     * Creates batches of documents using Stateful Processing based on user 
configurable settings of
+     * withMaxBufferingDuration and withMaxParallelRequestsPerWindow.
+     *
+     * <p>Mostly exists for testability of withMaxParallelRequestsPerWindow.
+     */
+    @VisibleForTesting
+    static class StatefulBatching
+        extends PTransform<PCollection<String>, PCollection<KV<Integer, 
Iterable<String>>>> {
+      final BulkIO spec;
+
+      private StatefulBatching(BulkIO bulkSpec) {
+        spec = bulkSpec;
+      }
+
+      public static StatefulBatching fromSpec(BulkIO spec) {
+        return new StatefulBatching(spec);
+      }
+
+      @Override
+      public PCollection<KV<Integer, Iterable<String>>> 
expand(PCollection<String> input) {
+        GroupIntoBatches<Integer, String> groupIntoBatches =
+            GroupIntoBatches.ofSize(spec.getMaxBatchSize());
+
+        if (spec.getMaxBufferingDuration() != null) {
+          groupIntoBatches =
+              
groupIntoBatches.withMaxBufferingDuration(spec.getMaxBufferingDuration());
+        }
+
+        return input
+            .apply(ParDo.of(new 
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+            .apply(groupIntoBatches);
+      }
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
       ConnectionConfiguration connectionConfiguration = 
getConnectionConfiguration();
-      FieldValueExtractFn idFn = getIdFn();
-      BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+
       checkState(connectionConfiguration != null, 
"withConnectionConfiguration() is required");
-      checkArgument(
-          isDeleteFn == null || idFn != null,
-          "Id needs to be specified by withIdFn for delete operation");
-      input.apply(ParDo.of(new WriteFn(this)));
+
+      if (getUseStatefulBatches()) {
+        input.apply(StatefulBatching.fromSpec(this)).apply(ParDo.of(new 
BulkIOStatefulFn(this)));
+      } else {
+        input.apply(ParDo.of(new BulkIOBundleFn(this)));
+      }
       return PDone.in(input.getPipeline());
     }
 
-    /** {@link DoFn} to for the {@link Write} transform. */
-    @VisibleForTesting
-    static class WriteFn extends DoFn<String, Void> {
-      private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-      private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race 
conditions on updates
+    static class BulkIOBundleFn extends BulkIOBaseFn<String> {
+      @VisibleForTesting
+      BulkIOBundleFn(BulkIO bulkSpec) {
+        super(bulkSpec);
+      }
 
-      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        String bulkApiEntity = context.element();
+        addAndMaybeFlush(bulkApiEntity);
+      }
+    }
 
+    /*
+    Intended for use in conjunction with {@link GroupIntoBatches}
+     */
+    static class BulkIOStatefulFn extends BulkIOBaseFn<KV<Integer, 
Iterable<String>>> {
       @VisibleForTesting
-      static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch. 
Retry attempt[%d]";
+      BulkIOStatefulFn(BulkIO bulkSpec) {
+        super(bulkSpec);
+      }
 
-      @VisibleForTesting
-      static final String RETRY_FAILED_LOG =
-          "Error writing to ES after %d attempt(s). No more attempts allowed";
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        Iterable<String> bulkApiEntities = context.element().getValue();
+        for (String bulkApiEntity : bulkApiEntities) {
+          addAndMaybeFlush(bulkApiEntity);
+        }
+      }
+    }
+
+    /** {@link DoFn} to for the {@link BulkIO} transform. */
+    @VisibleForTesting
+    private abstract static class BulkIOBaseFn<T> extends DoFn<T, Void> {
+      private static final Duration RETRY_INITIAL_BACKOFF = 
Duration.standardSeconds(5);
 
       private transient FluentBackoff retryBackoff;
 
-      private int backendVersion;
-      private final Write spec;
+      private BulkIO spec;
       private transient RestClient restClient;
-      private ArrayList<String> batch;
-      private long currentBatchSizeBytes;
-
-      // Encapsulates the elements which form the metadata for an 
Elasticsearch bulk operation
-      private static class DocumentMetadata implements Serializable {
-        final String index;
-        final String type;
-        final String id;
-        final Integer retryOnConflict;
-
-        DocumentMetadata(String index, String type, String id, Integer 
retryOnConflict) {
-          this.index = index;
-          this.type = type;
-          this.id = id;
-          this.retryOnConflict = retryOnConflict;
-        }
-      }
+      protected ArrayList<String> batch;

Review comment:
       can be private

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(
+        0,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "3", null, 
KV.of(3, numDocs)));
+  }
+
+  /**
+   * Tests upsert script by adding a group field to each document in the 
standard test set. The
+   * group field is populated as the modulo 2 of the document id allowing for 
a test to ensure the
+   * documents are split into 2 groups.
+   */
+  void testWriteScriptedUpsert() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withUpsertScript(SCRIPT_SOURCE);
+
+    // Test that documents can be inserted/created by using withUpsertScript
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    // defensive coding to ensure our initial state is as expected
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // All docs should have have group = 0 added by the script upon creation
+    assertEquals(
+        numDocs, countByMatch(connectionConfiguration, restClient, "group", 
"0", null, null));
+
+    // Run the same data again. This time, because all docs exist in the index 
already, scripted
+    // updates should happen rather than scripted inserts.
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // The script will set either 0 or 1 for the group value on update 
operations
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testMaxParallelRequestsPerWindow() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Long>> batches =
+        pipeline
+            .apply(Create.of(data))
+            .apply(Window.into(new GlobalWindows()))
+            .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+            .apply(Count.perKey());
+
+    // Number of unique keys produced should be number of 
maxParallelRequestsPerWindow * numWindows
+    // There is only 1 request (key) per window, and 1 (global) window ie. one 
key total
+    
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0, 
1L)));

Review comment:
       It is not testing what you think it tests: 
`.apply(StatefulBatching.fromSpec(write.getBulkIO()))` outputs a 
`PColection<KV<K, Iterable<V>>>` with a single key (as the sharded key was 
specified to 1 key). So the collection contains only one key (0) with an 
iterable associated to it. So count says that there is only a single element 
(the iterable) for key 0. You are not testing the content of the iterable but 
the number of iterables associated to the 0 key. So please add also a test of 
the content of the iterable. For that you need to do: `PAssert.that("error 
message", collection)
           .satisfies(function)` and implement the function with asserts.

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(
+        0,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "3", null, 
KV.of(3, numDocs)));
+  }
+
+  /**
+   * Tests upsert script by adding a group field to each document in the 
standard test set. The
+   * group field is populated as the modulo 2 of the document id allowing for 
a test to ensure the
+   * documents are split into 2 groups.
+   */
+  void testWriteScriptedUpsert() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withUpsertScript(SCRIPT_SOURCE);
+
+    // Test that documents can be inserted/created by using withUpsertScript
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    // defensive coding to ensure our initial state is as expected
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // All docs should have have group = 0 added by the script upon creation
+    assertEquals(
+        numDocs, countByMatch(connectionConfiguration, restClient, "group", 
"0", null, null));
+
+    // Run the same data again. This time, because all docs exist in the index 
already, scripted
+    // updates should happen rather than scripted inserts.
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // The script will set either 0 or 1 for the group value on update 
operations
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testMaxParallelRequestsPerWindow() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Long>> batches =
+        pipeline
+            .apply(Create.of(data))
+            .apply(Window.into(new GlobalWindows()))
+            .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+            .apply(Count.perKey());
+
+    // Number of unique keys produced should be number of 
maxParallelRequestsPerWindow * numWindows
+    // There is only 1 request (key) per window, and 1 (global) window ie. one 
key total
+    
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0, 
1L)));
+
+    pipeline.run();
+  }
+
+  void testMaxBufferingDurationAndMaxParallelRequestsPerWindow() throws 
Exception {

Review comment:
       unless I missed something, I think it is not needed as comparing to the 
other parallel test, all that it tests is the fact that maxDuration works 
(which was already tested when the GroupIntoBatches PR was merged)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to