[
https://issues.apache.org/jira/browse/BEAM-4389?focusedWorklogId=107051&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-107051
]
ASF GitHub Bot logged work on BEAM-4389:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/May/18 07:56
Start Date: 30/May/18 07:56
Worklog Time Spent: 10m
Work Description: echauchot closed pull request #5463: [BEAM-4389] Enable
partial updates in ElasticsearchIO
URL: https://github.com/apache/beam/pull/5463
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index bd766c3fc04..d3ddf913441 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -45,7 +45,8 @@
* correct server IP:
*
* <pre>
- * ./gradlew integrationTest -p sdks/java/io/elasticsearch
-DintegrationTestPipelineOptions='[
+ * ./gradlew integrationTest -p
sdks/java/io/elasticsearch-tests/elasticsearch-tests-2
+ * -DintegrationTestPipelineOptions='[
* "--elasticsearchServer=1.2.3.4",
* "--elasticsearchHttpPort=9200"]'
* --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT
@@ -60,6 +61,7 @@
private static IOTestPipelineOptions options;
private static ConnectionConfiguration readConnectionConfiguration;
private static ConnectionConfiguration writeConnectionConfiguration;
+ private static ConnectionConfiguration updateConnectionConfiguration;
private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
@Rule
@@ -70,9 +72,11 @@ public static void beforeClass() throws Exception {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
options =
TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
readConnectionConfiguration = ElasticsearchIOITCommon
- .getConnectionConfiguration(options,
ElasticsearchIOITCommon.ReadOrWrite.READ);
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.READ);
writeConnectionConfiguration = ElasticsearchIOITCommon
- .getConnectionConfiguration(options,
ElasticsearchIOITCommon.ReadOrWrite.WRITE);
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.WRITE);
+ updateConnectionConfiguration = ElasticsearchIOITCommon
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL);
restClient = readConnectionConfiguration.createClient();
elasticsearchIOTestCommon = new
ElasticsearchIOTestCommon(readConnectionConfiguration,
restClient, true);
@@ -81,6 +85,7 @@ public static void beforeClass() throws Exception {
@AfterClass
public static void afterClass() throws Exception {
ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration,
restClient);
+ ElasticSearchIOTestUtils.deleteIndex(updateConnectionConfiguration,
restClient);
restClient.close();
}
@@ -89,13 +94,13 @@ public void testSplitsVolume() throws Exception {
Read read =
ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
BoundedElasticsearchSource initialSource = new
BoundedElasticsearchSource(read, null, null,
null);
- //desiredBundleSize is ignored because in ES 2.x there is no way to split
shards. So we get
+ // desiredBundleSize is ignored because in ES 2.x there is no way to split
shards. So we get
// as many bundles as ES shards and bundle size is shard size
long desiredBundleSizeBytes = 0;
List<? extends BoundedSource<String>> splits = initialSource
.split(desiredBundleSizeBytes, options);
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits,
options);
- //this is the number of ES shards
+ // this is the number of ES shards
// (By default, each index in Elasticsearch is allocated 5 primary shards)
long expectedNumSplits = 5;
assertEquals(expectedNumSplits, splits.size());
@@ -135,10 +140,28 @@ public void testSizesVolume() throws Exception {
*/
@Test
public void testWriteVolumeWithFullAddressing() throws Exception {
- //cannot share elasticsearchIOTestCommon because tests run in parallel.
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = new
ElasticsearchIOTestCommon(
writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}
+
+ /**
+ * This test verifies volume partial updates of Elasticsearch. The test
dataset index is cloned
+ * and then a new field is added to each document using a partial update.
The test then asserts
+ * the updates were applied.
+ */
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ ElasticSearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate = new
ElasticsearchIOTestCommon(
+ updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
+ elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 1b3819829b9..476fa7f1679 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -212,4 +212,10 @@ public void testWriteFullAddressing() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithFullAddressing();
}
+
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWritePartialUpdate();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
index 5c187e3ba37..2659c0dfe1f 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java
@@ -45,7 +45,8 @@
* correct server IP:
*
* <pre>
- * ./gradlew integrationTest -p sdks/java/io/elasticsearch
-DintegrationTestPipelineOptions='[
+ * ./gradlew integrationTest -p
sdks/java/io/elasticsearch-tests/elasticsearch-tests-5
+ * -DintegrationTestPipelineOptions='[
* "--elasticsearchServer=1.2.3.4",
* "--elasticsearchHttpPort=9200"]'
* --tests org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOIT
@@ -60,6 +61,7 @@
private static IOTestPipelineOptions options;
private static ConnectionConfiguration readConnectionConfiguration;
private static ConnectionConfiguration writeConnectionConfiguration;
+ private static ConnectionConfiguration updateConnectionConfiguration;
private static ElasticsearchIOTestCommon elasticsearchIOTestCommon;
@Rule
@@ -70,9 +72,11 @@ public static void beforeClass() throws Exception {
PipelineOptionsFactory.register(IOTestPipelineOptions.class);
options =
TestPipeline.testingPipelineOptions().as(IOTestPipelineOptions.class);
readConnectionConfiguration = ElasticsearchIOITCommon
- .getConnectionConfiguration(options,
ElasticsearchIOITCommon.ReadOrWrite.READ);
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.READ);
writeConnectionConfiguration = ElasticsearchIOITCommon
- .getConnectionConfiguration(options,
ElasticsearchIOITCommon.ReadOrWrite.WRITE);
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.WRITE);
+ updateConnectionConfiguration = ElasticsearchIOITCommon
+ .getConnectionConfiguration(options,
ElasticsearchIOITCommon.IndexMode.WRITE_PARTIAL);
restClient = readConnectionConfiguration.createClient();
elasticsearchIOTestCommon = new
ElasticsearchIOTestCommon(readConnectionConfiguration,
restClient, true);
@@ -81,6 +85,7 @@ public static void beforeClass() throws Exception {
@AfterClass
public static void afterClass() throws Exception {
ElasticSearchIOTestUtils.deleteIndex(writeConnectionConfiguration,
restClient);
+ ElasticSearchIOTestUtils.deleteIndex(updateConnectionConfiguration,
restClient);
restClient.close();
}
@@ -114,7 +119,7 @@ public void testReadVolume() throws Exception {
@Test
public void testWriteVolume() throws Exception {
- //cannot share elasticsearchIOTestCommon because tests run in parallel.
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = new
ElasticsearchIOTestCommon(
writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
@@ -134,10 +139,28 @@ public void testSizesVolume() throws Exception {
*/
@Test
public void testWriteWithFullAddressingVolume() throws Exception {
- //cannot share elasticsearchIOTestCommon because tests run in parallel.
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
ElasticsearchIOTestCommon elasticsearchIOTestCommonWrite = new
ElasticsearchIOTestCommon(
writeConnectionConfiguration, restClient, true);
elasticsearchIOTestCommonWrite.setPipeline(pipeline);
elasticsearchIOTestCommonWrite.testWriteWithFullAddressing();
}
+
+ /**
+ * This test verifies volume partial updates of Elasticsearch. The test
dataset index is cloned
+ * and then a new field is added to each document using a partial update.
The test then asserts
+ * the updates where appied.
+ */
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ ElasticSearchIOTestUtils.copyIndex(
+ restClient,
+ readConnectionConfiguration.getIndex(),
+ updateConnectionConfiguration.getIndex());
+ // cannot share elasticsearchIOTestCommon because tests run in parallel.
+ ElasticsearchIOTestCommon elasticsearchIOTestCommonUpdate = new
ElasticsearchIOTestCommon(
+ updateConnectionConfiguration, restClient, true);
+ elasticsearchIOTestCommonUpdate.setPipeline(pipeline);
+ elasticsearchIOTestCommonUpdate.testWritePartialUpdate();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
index 45c80071158..e7a4e132bf5 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java
@@ -211,4 +211,10 @@ public void testWriteFullAddressing() throws Exception {
elasticsearchIOTestCommon.setPipeline(pipeline);
elasticsearchIOTestCommon.testWriteWithFullAddressing();
}
+
+ @Test
+ public void testWritePartialUpdate() throws Exception {
+ elasticsearchIOTestCommon.setPipeline(pipeline);
+ elasticsearchIOTestCommon.testWritePartialUpdate();
+ }
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
index 581db5af410..4fe0339aaec 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticSearchIOTestUtils.java
@@ -56,10 +56,14 @@
}
/** Deletes the given index synchronously. */
- static void deleteIndex(ConnectionConfiguration connectionConfiguration,
- RestClient restClient) throws IOException {
+ static void deleteIndex(ConnectionConfiguration connectionConfiguration,
RestClient restClient)
+ throws IOException {
+ deleteIndex(restClient, connectionConfiguration.getIndex());
+ }
+
+ private static void deleteIndex(RestClient restClient, String index) throws
IOException {
try {
- restClient.performRequest("DELETE", String.format("/%s",
connectionConfiguration.getIndex()));
+ restClient.performRequest("DELETE", String.format("/%s", index));
} catch (IOException e) {
// it is fine to ignore this expression as deleteIndex occurs in @before,
// so when the first tests is run, the index does not exist yet
@@ -69,6 +73,21 @@ static void deleteIndex(ConnectionConfiguration
connectionConfiguration,
}
}
+ /**
+ * Synchronously deletes the target if it exists and then (re)creates it as
a copy of source
+ * synchronously.
+ */
+ static void copyIndex(RestClient restClient, String source, String target)
throws IOException {
+ deleteIndex(restClient, target);
+ HttpEntity entity =
+ new NStringEntity(
+ String.format(
+ "{\"source\" : { \"index\" : \"%s\" }, \"dest\" : { \"index\"
: \"%s\" } }",
+ source, target),
+ ContentType.APPLICATION_JSON);
+ restClient.performRequest("POST", "/_reindex", Collections.EMPTY_MAP,
entity);
+ }
+
/** Inserts the given number of test documents into Elasticsearch. */
static void insertTestDocuments(ConnectionConfiguration
connectionConfiguration,
long numDocs, RestClient restClient) throws IOException {
@@ -173,10 +192,29 @@ static long refreshIndexAndGetCurrentNumDocs(RestClient
restClient, String index
static int countByScientistName(
ConnectionConfiguration connectionConfiguration, RestClient restClient,
String scientistName)
throws IOException {
+ return countByMatch(connectionConfiguration, restClient, "scientist",
scientistName);
+ }
+
+ /**
+ * Executes a match query for given field/value and returns the count of
results.
+ *
+ * @param connectionConfiguration Specifies the index and type
+ * @param restClient To use to execute the call
+ * @param field The field to query
+ * @param value The value to match
+ * @return The count of documents in the search result
+ * @throws IOException On error communicating with Elasticsearch
+ */
+ static int countByMatch(
+ ConnectionConfiguration connectionConfiguration,
+ RestClient restClient,
+ String field,
+ String value)
+ throws IOException {
String requestBody =
- "{\n"
+ "{\n"
+ " \"query\" : {\"match\": {\n"
- + " \"scientist\": \"" + scientistName + "\"\n"
+ + " \"" + field + "\": \"" + value + "\"\n"
+ " }}\n"
+ "}\n";
String endPoint =
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
index 129d414c45f..ef7722d5f5e 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOITCommon.java
@@ -35,7 +35,21 @@
*/
public class ElasticsearchIOITCommon {
- private static final String writeIndex = ES_INDEX +
System.currentTimeMillis();
+ /** Enum encapsulating the mode of operation and the index. */
+ enum IndexMode {
+ READ(ES_INDEX),
+ WRITE(ES_INDEX + System.currentTimeMillis()),
+ WRITE_PARTIAL(ES_INDEX + "_partial_" + System.currentTimeMillis());
+
+ private final String index;
+ IndexMode(String index) {
+ this.index = index;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+ }
/**
* Use this to create the index for reading before IT read tests.
@@ -64,15 +78,15 @@ public static void main(String[] args) throws Exception {
private static void createAndPopulateReadIndex(IOTestPipelineOptions
options) throws Exception {
// automatically creates the index and insert docs
ConnectionConfiguration connectionConfiguration =
- getConnectionConfiguration(options, ReadOrWrite.READ);
+ getConnectionConfiguration(options, IndexMode.READ);
try (RestClient restClient = connectionConfiguration.createClient()) {
ElasticSearchIOTestUtils
.insertTestDocuments(connectionConfiguration, NUM_DOCS_ITESTS,
restClient);
}
}
- static ConnectionConfiguration
getConnectionConfiguration(IOTestPipelineOptions options,
- ReadOrWrite rOw) {
+ static ConnectionConfiguration getConnectionConfiguration(
+ IOTestPipelineOptions options, IndexMode mode) {
return ConnectionConfiguration.create(
new String[] {
"http://"
@@ -80,13 +94,9 @@ static ConnectionConfiguration
getConnectionConfiguration(IOTestPipelineOptions
+ ":"
+ options.getElasticsearchHttpPort()
},
- (rOw == ReadOrWrite.READ) ? ES_INDEX : writeIndex,
+ mode.getIndex(),
ES_TYPE);
}
- /** Enum that tells whether we use the index for reading or for writing. */
- enum ReadOrWrite {
- READ,
- WRITE
- }
+
}
diff --git
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
index 3da47dfe456..8c55f867540 100644
---
a/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
+++
b/sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
@@ -19,6 +19,7 @@
import static
org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils.FAMOUS_SCIENTISTS;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils.NUM_SCIENTISTS;
+import static
org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils.countByMatch;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils.countByScientistName;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticSearchIOTestUtils.refreshIndexAndGetCurrentNumDocs;
import static
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
@@ -34,6 +35,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -344,11 +346,11 @@ void testWriteWithIndexFn() throws Exception {
ElasticSearchIOTestUtils.createDocuments(
adjustedNumDocs,
ElasticSearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
pipeline
- .apply(Create.of(data))
- .apply(
- ElasticsearchIO.write()
-
.withConnectionConfiguration(connectionConfiguration)
- .withIndexFn(new ExtractValueFn("scientist")));
+ .apply(Create.of(data))
+ .apply(
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIndexFn(new ExtractValueFn("scientist")));
pipeline.run();
// verify counts on each index
@@ -434,4 +436,51 @@ void testWriteWithFullAddressing() throws Exception {
}
}
+
+ /**
+ * Tests partial updates by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWritePartialUpdate() throws Exception {
+ if (!useAsITests) {
+ ElasticSearchIOTestUtils.insertTestDocuments(connectionConfiguration,
numDocs, restClient);
+ }
+
+ // defensive coding to ensure our initial state is as expected
+
+ long currentNumDocs =
+ refreshIndexAndGetCurrentNumDocs(
+ connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // partial documents containing the ID and group only
+ List<String> data = new ArrayList<>();
+ for (int i = 0; i < numDocs; i++) {
+ data.add(String.format("{\"id\" : %s, \"group\" : %s}", i, i % 2));
+ }
+
+ pipeline
+ .apply(Create.of(data))
+ .apply(
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUsePartialUpdate(true));
+ pipeline.run();
+
+ currentNumDocs =
+ refreshIndexAndGetCurrentNumDocs(
+ connectionConfiguration, restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein"));
+
+ // Partial update assertions
+ assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
+ assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ }
}
diff --git
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
index 2cca9f61341..ab1ea366e39 100644
---
a/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
+++
b/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
@@ -124,8 +124,11 @@
* using the document id auto-generated by Elasticsearch.
*
* <p>Optionally, you can provide {@link
ElasticsearchIO.Write.FieldValueExtractFn} using {@code
- * withIndexFn()} or {@code withTypeFn()} to enable per-document routing to
the target
- * Elasticsearch index and type.
+ * withIndexFn()} or {@code withTypeFn()} to enable per-document routing to
the target Elasticsearch
+ * index and type.
+ *
+ * <p>When {withUsePartialUpdate()} is enabled, the input document must
contain an id field and
+ * {@code withIdFn()} must be used to allow its extraction by the
ElasticsearchIO.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class ElasticsearchIO {
@@ -146,6 +149,7 @@ public static Write write() {
.setMaxBatchSize(1000L)
// advised default starting batch size in ES docs
.setMaxBatchSizeBytes(5L * 1024L * 1024L)
+ .setUsePartialUpdate(false) // default is document upsert
.build();
}
@@ -745,6 +749,8 @@ public void close() throws IOException {
@Nullable
abstract FieldValueExtractFn getTypeFn();
+ abstract boolean getUsePartialUpdate();
+
abstract Builder builder();
@AutoValue.Builder
@@ -761,6 +767,8 @@ public void close() throws IOException {
abstract Builder setTypeFn(FieldValueExtractFn typeFn);
+ abstract Builder setUsePartialUpdate(boolean usePartialUpdate);
+
abstract Write build();
}
@@ -849,6 +857,17 @@ public Write withTypeFn(FieldValueExtractFn typeFn) {
return builder().setTypeFn(typeFn).build();
}
+ /**
+ * Provide an instruction to control whether partial updates or inserts
(default) are issued to
+ * Elasticsearch.
+ *
+ * @param usePartialUpdate set to true to issue partial updates
+ * @return the {@link Write} with the partial update control set
+ */
+ public Write withUsePartialUpdate(boolean usePartialUpdate) {
+ return builder().setUsePartialUpdate(usePartialUpdate).build();
+ }
+
@Override
public PDone expand(PCollection<String> input) {
ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
@@ -863,6 +882,7 @@ public PDone expand(PCollection<String> input) {
@VisibleForTesting
static class WriteFn extends DoFn<String, Void> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race
conditions on updates
private int backendVersion;
private final Write spec;
@@ -870,10 +890,10 @@ public PDone expand(PCollection<String> input) {
private ArrayList<String> batch;
private long currentBatchSizeBytes;
- // Encapsulates the elements which form a complete Elasticsearch
document address
+ // Encapsulates the elements which form the metadata for an
Elasticsearch bulk operation
@JsonPropertyOrder({ "_index", "_type", "_id" })
@JsonInclude(JsonInclude.Include.NON_NULL)
- private static class DocumentAddress implements Serializable {
+ private static class DocumentMetadata implements Serializable {
@JsonProperty("_index")
final String index;
@@ -883,10 +903,14 @@ public PDone expand(PCollection<String> input) {
@JsonProperty("_id")
final String id;
- DocumentAddress(String index, String type, String id) {
+ @JsonProperty("_retry_on_conflict")
+ final Integer retryOnConflict;
+
+ DocumentMetadata(String index, String type, String id, Integer
retryOnConflict) {
this.index = index;
this.type = type;
this.id = id;
+ this.retryOnConflict = retryOnConflict;
}
}
@@ -911,7 +935,7 @@ public void startBundle(StartBundleContext context) {
/**
* Extracts the components that comprise the document address from the
document using the
* {@link FieldValueExtractFn} configured. This allows any or all of the
index, type and
- * document id to be controlled on a per document basis. If none are
provided the an empty
+ * document id to be controlled on a per document basis. If none are
provided then an empty
* default of {@code {}} is returned. Sanitization of the index is
performed, automatically
* lower-casing the value as required by Elasticsearch.
*
@@ -919,19 +943,20 @@ public void startBundle(StartBundleContext context) {
* @return the document address as JSON or the default
* @throws IOException if the document cannot be parsed as JSON
*/
- private String getDocumentAddress(String document) throws IOException {
+ private String getDocumentMetadata(String document) throws IOException {
if (spec.getIndexFn() != null || spec.getTypeFn() != null ||
spec.getIdFn() != null) {
// parse once and reused for efficiency
JsonNode parsedDocument = OBJECT_MAPPER.readTree(document);
- DocumentAddress address =
- new DocumentAddress(
+ DocumentMetadata metadata =
+ new DocumentMetadata(
spec.getIndexFn() != null
?
lowerCaseOrNull(spec.getIndexFn().apply(parsedDocument))
: null,
spec.getTypeFn() != null ?
spec.getTypeFn().apply(parsedDocument) : null,
- spec.getIdFn() != null ?
spec.getIdFn().apply(parsedDocument) : null);
- return OBJECT_MAPPER.writeValueAsString(address);
+ spec.getIdFn() != null ?
spec.getIdFn().apply(parsedDocument) : null,
+ spec.getUsePartialUpdate() ? DEFAULT_RETRY_ON_CONFLICT :
null);
+ return OBJECT_MAPPER.writeValueAsString(metadata);
} else {
return "{}"; // use configuration and auto-generated document IDs
@@ -945,9 +970,18 @@ private static String lowerCaseOrNull(String input) {
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
String document = context.element();
- String documentAddress = getDocumentAddress(document);
+ String documentMetadata = getDocumentMetadata(document);
+
+ // index is an insert/upsert and update is a partial update (or insert
if not existing)
+ if (spec.getUsePartialUpdate()) {
+ batch.add(
+ String.format(
+ "{ \"update\" : %s }%n{ \"doc\" : %s, \"doc_as_upsert\" :
true }%n",
+ documentMetadata, document));
+ } else {
+ batch.add(String.format("{ \"index\" : %s }%n%s%n",
documentMetadata, document));
+ }
- batch.add(String.format("{ \"index\" : %s }%n%s%n", documentAddress,
document));
currentBatchSizeBytes +=
document.getBytes(StandardCharsets.UTF_8).length;
if (batch.size() >= spec.getMaxBatchSize()
|| currentBatchSizeBytes >= spec.getMaxBatchSizeBytes()) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 107051)
Time Spent: 1h 40m (was: 1.5h)
> Enable partial updates Elasticsearch
> ------------------------------------
>
> Key: BEAM-4389
> URL: https://issues.apache.org/jira/browse/BEAM-4389
> Project: Beam
> Issue Type: New Feature
> Components: io-java-elasticsearch
> Affects Versions: 2.4.0
> Reporter: Tim Robertson
> Assignee: Tim Robertson
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> Expose a configuration option on the {{ElasticsearchIO}} to enable partial
> updates rather than full document inserts.
> Rationale: We have the case where different pipelines process different
> categories of information of the target entity (e.g. one for taxonomic
> processing, another for geospatial processing). A read and merge is not
> possible inside the batch call, meaning the only way to do it is through a
> join. The join approach is slow, and also stops the ability to run a single
> process in isolation (e.g. reprocess the geospatial component of all docs).
> Use of this configuration parameter has to be used in conjunction with
> controlling the document ID (possible since BEAM-3201) to make sense.
> The client API would include a {{withUseUpdate(...)}} such as:
> {code}
> source.apply(
> ElasticsearchIO.write()
> .withConnectionConfiguration(connectionConfiguration)
> .withIdFn(new ExtractValueFn("id"))
> .withUseUpdate(true)
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)