egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629659689



##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +666,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(

Review comment:
       The version numbers are checked via `countByMatch`[1] 👍 
   
   [1] 
https://github.com/apache/beam/pull/14347/files#diff-0971ee817a63d0687f2f2977c2c247d5d56c654efa036718cb616f0d49a30f20R379-R388




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to