[ 
https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=594032&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-594032
 ]

ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/May/21 14:58
            Start Date: 10/May/21 14:58
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629433650



##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -257,6 +259,14 @@ void testWrite() throws Exception {
     executeWriteTest(write);
   }
 
+  void testWriteStateful() throws Exception {

Review comment:
       module tiny fixes in code comments:
   upsert test: LGTM
   routing test: LGTM
   doc version test: LGTM
   

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -532,6 +602,34 @@ void testWriteWithFullAddressing() throws Exception {
     }
   }
 
+  /**
+   * Tests that documents are correctly routed when index, type and document 
ID functions are
+   * provided to overwrite the defaults of using the configuration and 
auto-generation of the
+   * document IDs by Elasticsearch. The scientist name is used for the index, 
type and document ID.
+   * As a result there should be only a single document in each index/type.
+   */

Review comment:
       there should be numdocs / nb scientists for each (per-scientist) index, 
no ?

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -532,6 +602,34 @@ void testWriteWithFullAddressing() throws Exception {
     }
   }
 
+  /**
+   * Tests that documents are correctly routed when index, type and document 
ID functions are
+   * provided to overwrite the defaults of using the configuration and 
auto-generation of the
+   * document IDs by Elasticsearch. The scientist name is used for the index, 
type and document ID.

Review comment:
       I see no other Fn than routingFn specified, copy paste leftover?
    please correct javadoc

##########
File path: 
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +666,167 @@ void testWritePartialUpdate() throws Exception {
     assertEquals(numDocs, currentNumDocs);
     assertEquals(
         numDocs / NUM_SCIENTISTS,
-        countByScientistName(connectionConfiguration, restClient, "Einstein"));
+        countByScientistName(connectionConfiguration, restClient, "Einstein", 
null));
 
     // Partial update assertions
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "0"));
-    assertEquals(numDocs / 2, countByMatch(connectionConfiguration, 
restClient, "group", "1"));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "0", null, null));
+    assertEquals(
+        numDocs / 2, countByMatch(connectionConfiguration, restClient, 
"group", "1", null, null));
+  }
+
+  void testWriteWithDocVersion() throws Exception {
+    List<ObjectNode> jsonData =
+        ElasticsearchIOTestUtils.createJsonDocuments(
+            numDocs, 
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+    List<String> data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      doc.put("my_version", "1");
+      data.add(doc.toString());
+    }
+
+    insertTestDocuments(connectionConfiguration, data, restClient);
+    long currentNumDocs = 
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+    assertEquals(numDocs, currentNumDocs);
+    // Check that all docs have the same "my_version"
+    assertEquals(
+        numDocs,
+        countByMatch(
+            connectionConfiguration, restClient, "my_version", "1", null, 
KV.of(1, numDocs)));
+
+    Write write =
+        ElasticsearchIO.write()
+            .withConnectionConfiguration(connectionConfiguration)
+            .withIdFn(new ExtractValueFn("id"))
+            .withDocVersionFn(new ExtractValueFn("my_version"))
+            .withDocVersionType("external");
+
+    data = new ArrayList<>();
+    for (ObjectNode doc : jsonData) {
+      // Set version to larger number than originally set, and larger than 
next logical version
+      // number set by default by ES.
+      doc.put("my_version", "3");
+      data.add(doc.toString());
+    }
+
+    // Test that documents with lower version are rejected, but rejections 
ignored when specified
+    pipeline.apply(Create.of(data)).apply(write);
+    pipeline.run();
+
+    currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration, 
restClient);
+    assertEquals(numDocs, currentNumDocs);
+
+    // my_version and doc version should have changed
+    assertEquals(

Review comment:
       please assert also on doc version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 594032)
    Time Spent: 11.5h  (was: 11h 20m)

> Overhaul ElasticsearchIO#Write
> ------------------------------
>
>                 Key: BEAM-12093
>                 URL: https://issues.apache.org/jira/browse/BEAM-12093
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-elasticsearch
>            Reporter: Evan Galpin
>            Assignee: Evan Galpin
>            Priority: P2
>              Labels: elasticsearch
>          Time Spent: 11.5h
>  Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas 
> which could be improved:
>  # Separation of concern
>  # Bulk API batch size optimization
>  
> Presently, the Write transform has 2 responsibilities which are coupled and 
> inseparable by users:
>  # Convert input documents into Bulk API entities, serializing based on user 
> settings (partial update, delete, upsert, etc)
>  # Batch the converted Bulk API entities together and interface with the 
> target ES cluster
>  
> Having these 2 roles tightly coupled means testing requires an available 
> Elasticsearch cluster, making unit testing almost impossible. Allowing access 
> to the serialized documents would make unit testing much easier for pipeline 
> developers, among numerous other benefits to having separation between 
> serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is 
> currently limited by the lesser of Beam Runner bundling semantics, and the 
> `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for 
> portability between runners, but it also means most Bulk payloads only have a 
> few (1-5) entities. By using Stateful Processing to better adhere to the 
> `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the 
> number of indexing requests in an Elasticsearch cluster by 50-100x. 
> Separating the role of document serialization and IO allows supporting 
> multiple IO techniques with minimal and understandable code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to