[
https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=601064&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-601064
]
ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 24/May/21 08:37
Start Date: 24/May/21 08:37
Worklog Time Spent: 10m
Work Description: echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r637774056
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +670,202 @@ void testWritePartialUpdate() throws Exception {
assertEquals(numDocs, currentNumDocs);
assertEquals(
numDocs / NUM_SCIENTISTS,
- countByScientistName(connectionConfiguration, restClient, "Einstein"));
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
// Partial update assertions
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testWriteWithDocVersion() throws Exception {
+ List<ObjectNode> jsonData =
+ ElasticsearchIOTestUtils.createJsonDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ List<String> data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ doc.put("my_version", "1");
+ data.add(doc.toString());
+ }
+
+ insertTestDocuments(connectionConfiguration, data, restClient);
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+ // Check that all docs have the same "my_version"
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withDocVersionFn(new ExtractValueFn("my_version"))
+ .withDocVersionType("external");
+
+ data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ // Set version to larger number than originally set, and larger than
next logical version
+ // number set by default by ES.
+ doc.put("my_version", "3");
+ data.add(doc.toString());
+ }
+
+ // Test that documents with lower version are rejected, but rejections
ignored when specified
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // my_version and doc version should have changed
+ assertEquals(
+ 0,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "3", null,
KV.of(3, numDocs)));
+ }
+
+ /**
+ * Tests upsert script by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWriteScriptedUpsert() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUpsertScript(SCRIPT_SOURCE);
+
+ // Test that documents can be inserted/created by using withUpsertScript
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ // defensive coding to ensure our initial state is as expected
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // All docs should have have group = 0 added by the script upon creation
+ assertEquals(
+ numDocs, countByMatch(connectionConfiguration, restClient, "group",
"0", null, null));
+
+ // Run the same data again. This time, because all docs exist in the index
already, scripted
+ // updates should happen rather than scripted inserts.
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // The script will set either 0 or 1 for the group value on update
operations
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testMaxParallelRequestsPerWindow() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Iterable<String>>> batches =
+
pipeline.apply(Create.of(data)).apply(StatefulBatching.fromSpec(write.getBulkIO()));
+
+ PCollection<Integer> keyValues =
+ batches
+ .apply(GroupByKey.create())
Review comment:
I think it is not needed, has _GroupintoBatches_ already outputs
Iterable<String> per key as Beam state is per key. This will simplify
_AssertThatHasExpectedContents_ code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 601064)
Time Spent: 14.5h (was: 14h 20m)
> Overhaul ElasticsearchIO#Write
> ------------------------------
>
> Key: BEAM-12093
> URL: https://issues.apache.org/jira/browse/BEAM-12093
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Evan Galpin
> Assignee: Evan Galpin
> Priority: P2
> Labels: elasticsearch
> Time Spent: 14.5h
> Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas
> which could be improved:
> # Separation of concern
> # Bulk API batch size optimization
>
> Presently, the Write transform has 2 responsibilities which are coupled and
> inseparable by users:
> # Convert input documents into Bulk API entities, serializing based on user
> settings (partial update, delete, upsert, etc)
> # Batch the converted Bulk API entities together and interface with the
> target ES cluster
>
> Having these 2 roles tightly coupled means testing requires an available
> Elasticsearch cluster, making unit testing almost impossible. Allowing access
> to the serialized documents would make unit testing much easier for pipeline
> developers, among numerous other benefits to having separation between
> serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is
> currently limited by the lesser of Beam Runner bundling semantics, and the
> `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for
> portability between runners, but it also means most Bulk payloads only have a
> few (1-5) entities. By using Stateful Processing to better adhere to the
> `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the
> number of indexing requests in an Elasticsearch cluster by 50-100x.
> Separating the role of document serialization and IO allows supporting
> multiple IO techniques with minimal and understandable code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)