egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r638772885



##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +670,202 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(
+        0,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "3", null, 
KV.of(3, numDocs)));
+  }
+
+  /**
+   * Tests upsert script by adding a group field to each document in the 
standard test set. The
+   * group field is populated as the modulo 2 of the document id allowing for 
a test to ensure the
+   * documents are split into 2 groups.
+   */
+  void testWriteScriptedUpsert() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withUpsertScript(SCRIPT_SOURCE);
+
+    // Test that documents can be inserted/created by using withUpsertScript
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    // defensive coding to ensure our initial state is as expected
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // All docs should have have group = 0 added by the script upon creation
+    assertEquals(
+        numDocs, countByMatch(connectionConfiguration, restClient, "group", 
"0", null, null));
+
+    // Run the same data again. This time, because all docs exist in the index 
already, scripted
+    // updates should happen rather than scripted inserts.
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+
+    // check we have not unwittingly modified existing behaviour
+    assertEquals(numDocs, currentNumDocs);
+    assertEquals(
+        numDocs / NUM_SCIENTISTS,
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
+
+    // The script will set either 0 or 1 for the group value on update 
operations
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testMaxParallelRequestsPerWindow() throws Exception {
+    List<String> data =
+        ElasticsearchIOTestUtils.createDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withMaxParallelRequestsPerWindow(1);
+
+    PCollection<KV<Integer, Iterable<String>>> batches =
+        
pipeline.apply(Create.of(data)).apply(StatefulBatching.fromSpec(write.getBulkIO()));
+
+    PCollection<Integer> keyValues =
+        batches
+            .apply(GroupByKey.create())

Review comment:
       GBK removed! :-) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to