echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r629978211
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
assertEquals(numDocs, currentNumDocs);
assertEquals(
numDocs / NUM_SCIENTISTS,
- countByScientistName(connectionConfiguration, restClient, "Einstein"));
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
// Partial update assertions
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testWriteWithDocVersion() throws Exception {
+ List<ObjectNode> jsonData =
+ ElasticsearchIOTestUtils.createJsonDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ List<String> data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ doc.put("my_version", "1");
+ data.add(doc.toString());
+ }
+
+ insertTestDocuments(connectionConfiguration, data, restClient);
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+ // Check that all docs have the same "my_version"
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withDocVersionFn(new ExtractValueFn("my_version"))
+ .withDocVersionType("external");
+
+ data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ // Set version to larger number than originally set, and larger than
next logical version
+ // number set by default by ES.
+ doc.put("my_version", "3");
+ data.add(doc.toString());
+ }
+
+ // Test that documents with lower version are rejected, but rejections
ignored when specified
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // my_version and doc version should have changed
+ assertEquals(
+ 0,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "3", null,
KV.of(3, numDocs)));
+ }
+
+ /**
+ * Tests upsert script by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWriteScriptedUpsert() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUpsertScript(SCRIPT_SOURCE);
+
+ // Test that documents can be inserted/created by using withUpsertScript
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ // defensive coding to ensure our initial state is as expected
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // All docs should have have group = 0 added by the script upon creation
+ assertEquals(
+ numDocs, countByMatch(connectionConfiguration, restClient, "group",
"0", null, null));
+
+ // Run the same data again. This time, because all docs exist in the index
already, scripted
+ // updates should happen rather than scripted inserts.
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // The script will set either 0 or 1 for the group value on update
operations
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testMaxParallelRequestsPerWindow() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Long>> batches =
+ pipeline
+ .apply(Create.of(data))
+ .apply(Window.into(new GlobalWindows()))
Review comment:
should not be needed: it is the default window when none is specified.
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
assertEquals(numDocs, currentNumDocs);
assertEquals(
numDocs / NUM_SCIENTISTS,
- countByScientistName(connectionConfiguration, restClient, "Einstein"));
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
// Partial update assertions
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testWriteWithDocVersion() throws Exception {
+ List<ObjectNode> jsonData =
+ ElasticsearchIOTestUtils.createJsonDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ List<String> data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ doc.put("my_version", "1");
+ data.add(doc.toString());
+ }
+
+ insertTestDocuments(connectionConfiguration, data, restClient);
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+ // Check that all docs have the same "my_version"
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withDocVersionFn(new ExtractValueFn("my_version"))
+ .withDocVersionType("external");
+
+ data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ // Set version to larger number than originally set, and larger than
next logical version
+ // number set by default by ES.
+ doc.put("my_version", "3");
+ data.add(doc.toString());
+ }
+
+ // Test that documents with lower version are rejected, but rejections
ignored when specified
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // my_version and doc version should have changed
+ assertEquals(
+ 0,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "3", null,
KV.of(3, numDocs)));
+ }
+
+ /**
+ * Tests upsert script by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWriteScriptedUpsert() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUpsertScript(SCRIPT_SOURCE);
+
+ // Test that documents can be inserted/created by using withUpsertScript
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ // defensive coding to ensure our initial state is as expected
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // All docs should have have group = 0 added by the script upon creation
+ assertEquals(
+ numDocs, countByMatch(connectionConfiguration, restClient, "group",
"0", null, null));
+
+ // Run the same data again. This time, because all docs exist in the index
already, scripted
+ // updates should happen rather than scripted inserts.
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // The script will set either 0 or 1 for the group value on update
operations
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testMaxParallelRequestsPerWindow() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Long>> batches =
+ pipeline
+ .apply(Create.of(data))
+ .apply(Window.into(new GlobalWindows()))
+ .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+ .apply(Count.perKey());
+
+ // Number of unique keys produced should be number of
maxParallelRequestsPerWindow * numWindows
+ // There is only 1 request (key) per window, and 1 (global) window ie. one
key total
+
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0,
1L)));
+
+ pipeline.run();
+ }
+
+ void testMaxBufferingDurationAndMaxParallelRequestsPerWindow() throws
Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxBufferingDuration(Duration.standardSeconds(1))
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Long>> batches =
+ pipeline
+ .apply(Create.of(data))
+ .apply(Window.into(new GlobalWindows()))
Review comment:
ditto
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1246,88 +1877,208 @@ public Write withUsePartialUpdate(boolean
usePartialUpdate) {
* }</pre>
*
* @param retryConfiguration the rules which govern the retry behavior
- * @return the {@link Write} with retrying configured
+ * @return the {@link BulkIO} with retrying configured
*/
- public Write withRetryConfiguration(RetryConfiguration retryConfiguration)
{
+ public BulkIO withRetryConfiguration(RetryConfiguration
retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is
required");
return builder().setRetryConfiguration(retryConfiguration).build();
}
/**
- * Provide a function to extract the target operation either upsert or
delete from the document
- * fields allowing dynamic bulk operation decision. While using
withIsDeleteFn, it should be
- * taken care that the document's id extraction is defined using the
withIdFn function or else
- * IllegalArgumentException is thrown. Should the function throw an
Exception then the batch
- * will fail and the exception propagated.
+ * Whether or not to suppress version conflict errors in a Bulk API
response. This can be useful
+ * if your use case involves using external version types.
*
- * @param isDeleteFn set to true for deleting the specific document
- * @return the {@link Write} with the function set
+ * @param ignoreVersionConflicts true to suppress version conflicts, false
to surface version
+ * conflict errors.
+ * @return the {@link BulkIO} with version conflict handling configured
*/
- public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
- checkArgument(isDeleteFn != null, "deleteFn is required");
- return builder().setIsDeleteFn(isDeleteFn).build();
+ public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+ Set<String> allowedResponseErrors = getAllowedResponseErrors();
+ if (allowedResponseErrors == null) {
+ allowedResponseErrors = new HashSet<>();
+ }
+ if (ignoreVersionConflicts) {
+ allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+ }
+
+ return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+ }
+
+ /**
+ * Provide a set of textual error types which can be contained in Bulk API
response
+ * items[].error.type field. Any element in @param
allowableResponseErrorTypes will suppress
+ * errors of the same type in Bulk responses.
+ *
+ * <p>See also
+ *
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+ *
+ * @param allowableResponseErrorTypes
+ * @return the {@link BulkIO} with allowable response errors set
+ */
+ public BulkIO withAllowableResponseErrors(@Nullable Set<String>
allowableResponseErrorTypes) {
+ if (allowableResponseErrorTypes == null) {
+ allowableResponseErrorTypes = new HashSet<>();
+ }
+
+ return
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+ }
+
+ /**
+ * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set
a maximum elapsed
+ * time before buffered elements are emitted to Elasticsearch as a Bulk
API request. If this
+ * config is not set, Bulk requests will not be issued until {@link
BulkIO#getMaxBatchSize}
+ * number of documents have been buffered. This may result in higher
latency in particular if
+ * your max batch size is set to a large value and your pipeline input is
low volume.
+ *
+ * @param maxBufferingDuration the maximum duration to wait before sending
any buffered
+ * documents to Elasticsearch, regardless of maxBatchSize.
+ * @return the {@link BulkIO} with maximum buffering duration set
+ */
+ public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+ LOG.warn(
+ "Use of withMaxBufferingDuration requires
withUseStatefulBatches(true). "
+ + "Setting that automatically.");
+ return builder()
+ .setUseStatefulBatches(true)
+ .setMaxBufferingDuration(maxBufferingDuration)
+ .build();
+ }
+
+ /**
+ * Whether or not to use Stateful Processing to ensure bulk requests have
the desired number of
+ * entities i.e. as close to the maxBatchSize as possible. By default
without this feature
+ * enabled, Bulk requests will not contain more than maxBatchSize
entities, but the lower bound
+ * of batch size is determined by Beam Runner bundle sizes, which may be
as few as 1.
+ *
+ * @param useStatefulBatches true enables the use of Stateful Processing
to ensure that batches
+ * are as close to the maxBatchSize as possible.
+ * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+ */
+ public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+ return builder().setUseStatefulBatches(useStatefulBatches).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
+ * can reduce parallelism greatly. If data is globally windowed and @param
+ * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
+ * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
+ * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
+ * less than the number of maximum workers in your pipeline, the IO work
will result in a
+ * sub-distribution of the last write step with most of the runners.
+ *
+ * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
+ * of data
+ * @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ */
+ public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ checkArgument(
+ maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
+ return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ }
+
+ /**
+ * Creates batches of documents using Stateful Processing based on user
configurable settings of
+ * withMaxBufferingDuration and withMaxParallelRequestsPerWindow.
+ *
+ * <p>Mostly exists for testability of withMaxParallelRequestsPerWindow.
+ */
+ @VisibleForTesting
+ static class StatefulBatching
+ extends PTransform<PCollection<String>, PCollection<KV<Integer,
Iterable<String>>>> {
+ final BulkIO spec;
+
+ private StatefulBatching(BulkIO bulkSpec) {
+ spec = bulkSpec;
+ }
+
+ public static StatefulBatching fromSpec(BulkIO spec) {
+ return new StatefulBatching(spec);
+ }
+
+ @Override
+ public PCollection<KV<Integer, Iterable<String>>>
expand(PCollection<String> input) {
+ GroupIntoBatches<Integer, String> groupIntoBatches =
+ GroupIntoBatches.ofSize(spec.getMaxBatchSize());
+
+ if (spec.getMaxBufferingDuration() != null) {
+ groupIntoBatches =
+
groupIntoBatches.withMaxBufferingDuration(spec.getMaxBufferingDuration());
+ }
+
+ return input
+ .apply(ParDo.of(new
Reshuffle.AssignShardFn<>(spec.getMaxParallelRequestsPerWindow())))
+ .apply(groupIntoBatches);
+ }
}
@Override
public PDone expand(PCollection<String> input) {
ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
- FieldValueExtractFn idFn = getIdFn();
- BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
+
checkState(connectionConfiguration != null,
"withConnectionConfiguration() is required");
- checkArgument(
- isDeleteFn == null || idFn != null,
- "Id needs to be specified by withIdFn for delete operation");
- input.apply(ParDo.of(new WriteFn(this)));
+
+ if (getUseStatefulBatches()) {
+ input.apply(StatefulBatching.fromSpec(this)).apply(ParDo.of(new
BulkIOStatefulFn(this)));
+ } else {
+ input.apply(ParDo.of(new BulkIOBundleFn(this)));
+ }
return PDone.in(input.getPipeline());
}
- /** {@link DoFn} to for the {@link Write} transform. */
- @VisibleForTesting
- static class WriteFn extends DoFn<String, Void> {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
- private static final int DEFAULT_RETRY_ON_CONFLICT = 5; // race
conditions on updates
+ static class BulkIOBundleFn extends BulkIOBaseFn<String> {
+ @VisibleForTesting
+ BulkIOBundleFn(BulkIO bulkSpec) {
+ super(bulkSpec);
+ }
- private static final Duration RETRY_INITIAL_BACKOFF =
Duration.standardSeconds(5);
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ String bulkApiEntity = context.element();
+ addAndMaybeFlush(bulkApiEntity);
+ }
+ }
+ /*
+ Intended for use in conjunction with {@link GroupIntoBatches}
+ */
+ static class BulkIOStatefulFn extends BulkIOBaseFn<KV<Integer,
Iterable<String>>> {
@VisibleForTesting
- static final String RETRY_ATTEMPT_LOG = "Error writing to Elasticsearch.
Retry attempt[%d]";
+ BulkIOStatefulFn(BulkIO bulkSpec) {
+ super(bulkSpec);
+ }
- @VisibleForTesting
- static final String RETRY_FAILED_LOG =
- "Error writing to ES after %d attempt(s). No more attempts allowed";
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ Iterable<String> bulkApiEntities = context.element().getValue();
+ for (String bulkApiEntity : bulkApiEntities) {
+ addAndMaybeFlush(bulkApiEntity);
+ }
+ }
+ }
+
+ /** {@link DoFn} to for the {@link BulkIO} transform. */
+ @VisibleForTesting
+ private abstract static class BulkIOBaseFn<T> extends DoFn<T, Void> {
+ private static final Duration RETRY_INITIAL_BACKOFF =
Duration.standardSeconds(5);
private transient FluentBackoff retryBackoff;
- private int backendVersion;
- private final Write spec;
+ private BulkIO spec;
private transient RestClient restClient;
- private ArrayList<String> batch;
- private long currentBatchSizeBytes;
-
- // Encapsulates the elements which form the metadata for an
Elasticsearch bulk operation
- private static class DocumentMetadata implements Serializable {
- final String index;
- final String type;
- final String id;
- final Integer retryOnConflict;
-
- DocumentMetadata(String index, String type, String id, Integer
retryOnConflict) {
- this.index = index;
- this.type = type;
- this.id = id;
- this.retryOnConflict = retryOnConflict;
- }
- }
+ protected ArrayList<String> batch;
Review comment:
can be private
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
assertEquals(numDocs, currentNumDocs);
assertEquals(
numDocs / NUM_SCIENTISTS,
- countByScientistName(connectionConfiguration, restClient, "Einstein"));
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
// Partial update assertions
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testWriteWithDocVersion() throws Exception {
+ List<ObjectNode> jsonData =
+ ElasticsearchIOTestUtils.createJsonDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ List<String> data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ doc.put("my_version", "1");
+ data.add(doc.toString());
+ }
+
+ insertTestDocuments(connectionConfiguration, data, restClient);
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+ // Check that all docs have the same "my_version"
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withDocVersionFn(new ExtractValueFn("my_version"))
+ .withDocVersionType("external");
+
+ data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ // Set version to larger number than originally set, and larger than
next logical version
+ // number set by default by ES.
+ doc.put("my_version", "3");
+ data.add(doc.toString());
+ }
+
+ // Test that documents with lower version are rejected, but rejections
ignored when specified
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // my_version and doc version should have changed
+ assertEquals(
+ 0,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "3", null,
KV.of(3, numDocs)));
+ }
+
+ /**
+ * Tests upsert script by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWriteScriptedUpsert() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUpsertScript(SCRIPT_SOURCE);
+
+ // Test that documents can be inserted/created by using withUpsertScript
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ // defensive coding to ensure our initial state is as expected
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // All docs should have have group = 0 added by the script upon creation
+ assertEquals(
+ numDocs, countByMatch(connectionConfiguration, restClient, "group",
"0", null, null));
+
+ // Run the same data again. This time, because all docs exist in the index
already, scripted
+ // updates should happen rather than scripted inserts.
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // The script will set either 0 or 1 for the group value on update
operations
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testMaxParallelRequestsPerWindow() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Long>> batches =
+ pipeline
+ .apply(Create.of(data))
+ .apply(Window.into(new GlobalWindows()))
+ .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+ .apply(Count.perKey());
+
+ // Number of unique keys produced should be number of
maxParallelRequestsPerWindow * numWindows
+ // There is only 1 request (key) per window, and 1 (global) window ie. one
key total
+
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0,
1L)));
Review comment:
It is not testing what you think it tests:
`.apply(StatefulBatching.fromSpec(write.getBulkIO()))` outputs a
`PColection<KV<K, Iterable<V>>>` with a single key (as the sharded key was
specified to 1 key). So the collection contains only one key (0) with an
iterable associated to it. So count says that there is only a single element
(the iterable) for key 0. You are not testing the content of the iterable but
the number of iterables associated to the 0 key. So please add also a test of
the content of the iterable. For that you need to do: `PAssert.that("error
message", collection)
.satisfies(function)` and implement the function with asserts.
##########
File path:
sdks/java/io/elasticsearch-tests/elasticsearch-tests-common/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTestCommon.java
##########
@@ -568,11 +664,167 @@ void testWritePartialUpdate() throws Exception {
assertEquals(numDocs, currentNumDocs);
assertEquals(
numDocs / NUM_SCIENTISTS,
- countByScientistName(connectionConfiguration, restClient, "Einstein"));
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
// Partial update assertions
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "0"));
- assertEquals(numDocs / 2, countByMatch(connectionConfiguration,
restClient, "group", "1"));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testWriteWithDocVersion() throws Exception {
+ List<ObjectNode> jsonData =
+ ElasticsearchIOTestUtils.createJsonDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ List<String> data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ doc.put("my_version", "1");
+ data.add(doc.toString());
+ }
+
+ insertTestDocuments(connectionConfiguration, data, restClient);
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ assertEquals(numDocs, currentNumDocs);
+ // Check that all docs have the same "my_version"
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withDocVersionFn(new ExtractValueFn("my_version"))
+ .withDocVersionType("external");
+
+ data = new ArrayList<>();
+ for (ObjectNode doc : jsonData) {
+ // Set version to larger number than originally set, and larger than
next logical version
+ // number set by default by ES.
+ doc.put("my_version", "3");
+ data.add(doc.toString());
+ }
+
+ // Test that documents with lower version are rejected, but rejections
ignored when specified
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+ assertEquals(numDocs, currentNumDocs);
+
+ // my_version and doc version should have changed
+ assertEquals(
+ 0,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "1", null,
KV.of(1, numDocs)));
+ assertEquals(
+ numDocs,
+ countByMatch(
+ connectionConfiguration, restClient, "my_version", "3", null,
KV.of(3, numDocs)));
+ }
+
+ /**
+ * Tests upsert script by adding a group field to each document in the
standard test set. The
+ * group field is populated as the modulo 2 of the document id allowing for
a test to ensure the
+ * documents are split into 2 groups.
+ */
+ void testWriteScriptedUpsert() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withIdFn(new ExtractValueFn("id"))
+ .withUpsertScript(SCRIPT_SOURCE);
+
+ // Test that documents can be inserted/created by using withUpsertScript
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ // defensive coding to ensure our initial state is as expected
+ long currentNumDocs =
refreshIndexAndGetCurrentNumDocs(connectionConfiguration, restClient);
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // All docs should have have group = 0 added by the script upon creation
+ assertEquals(
+ numDocs, countByMatch(connectionConfiguration, restClient, "group",
"0", null, null));
+
+ // Run the same data again. This time, because all docs exist in the index
already, scripted
+ // updates should happen rather than scripted inserts.
+ pipeline.apply(Create.of(data)).apply(write);
+ pipeline.run();
+
+ currentNumDocs = refreshIndexAndGetCurrentNumDocs(connectionConfiguration,
restClient);
+
+ // check we have not unwittingly modified existing behaviour
+ assertEquals(numDocs, currentNumDocs);
+ assertEquals(
+ numDocs / NUM_SCIENTISTS,
+ countByScientistName(connectionConfiguration, restClient, "Einstein",
null));
+
+ // The script will set either 0 or 1 for the group value on update
operations
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "0", null, null));
+ assertEquals(
+ numDocs / 2, countByMatch(connectionConfiguration, restClient,
"group", "1", null, null));
+ }
+
+ void testMaxParallelRequestsPerWindow() throws Exception {
+ List<String> data =
+ ElasticsearchIOTestUtils.createDocuments(
+ numDocs,
ElasticsearchIOTestUtils.InjectionMode.DO_NOT_INJECT_INVALID_DOCS);
+
+ Write write =
+ ElasticsearchIO.write()
+ .withConnectionConfiguration(connectionConfiguration)
+ .withMaxParallelRequestsPerWindow(1);
+
+ PCollection<KV<Integer, Long>> batches =
+ pipeline
+ .apply(Create.of(data))
+ .apply(Window.into(new GlobalWindows()))
+ .apply(StatefulBatching.fromSpec(write.getBulkIO()))
+ .apply(Count.perKey());
+
+ // Number of unique keys produced should be number of
maxParallelRequestsPerWindow * numWindows
+ // There is only 1 request (key) per window, and 1 (global) window ie. one
key total
+
PAssert.that(batches).containsInAnyOrder(Collections.singletonList(KV.of(0,
1L)));
+
+ pipeline.run();
+ }
+
+ void testMaxBufferingDurationAndMaxParallelRequestsPerWindow() throws
Exception {
Review comment:
unless I missed something, I think it is not needed as comparing to the
other parallel test, all that it tests is the fact that maxDuration works
(which was already tested when the GroupIntoBatches PR was merged)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]