echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629433650



##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -257,6 +259,14 @@ void testWrite() throws Exception {
     executeWriteTest(write);
   }
 
+  void testWriteStateful() throws Exception {

Review comment:
       module tiny fixes in code comments:
   upsert test: LGTM
   routing test: LGTM
   doc version test: LGTM
   

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -532,6 +602,34 @@ void testWriteWithFullAddressing() throws Exception {
     }
   }
 
+  /**
+   * Tests that documents are correctly routed when index, type and document 
ID functions are
+   * provided to overwrite the defaults of using the configuration and 
auto-generation of the
+   * document IDs by Elasticsearch. The scientist name is used for the index, 
type and document ID.
+   * As a result there should be only a single document in each index/type.
+   */

Review comment:
       there should be numdocs / nb scientists for each (per-scientist) index, 
no ?

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -532,6 +602,34 @@ void testWriteWithFullAddressing() throws Exception {
     }
   }
 
+  /**
+   * Tests that documents are correctly routed when index, type and document 
ID functions are
+   * provided to overwrite the defaults of using the configuration and 
auto-generation of the
+   * document IDs by Elasticsearch. The scientist name is used for the index, 
type and document ID.

Review comment:
       I see no other Fn than routingFn specified, copy paste leftover?
    please correct javadoc

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +666,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(

Review comment:
       please assert also on doc version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to