[
https://issues.apache.org/jira/browse/BEAM-12093?focusedWorklogId=591435&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-591435
]
ASF GitHub Bot logged work on BEAM-12093:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Apr/21 10:27
Start Date: 30/Apr/21 10:27
Worklog Time Spent: 10m
Work Description: echauchot commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r623774847
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1246,88 +1891,182 @@ public Write withUsePartialUpdate(boolean
usePartialUpdate) {
* }</pre>
*
* @param retryConfiguration the rules which govern the retry behavior
- * @return the {@link Write} with retrying configured
+ * @return the {@link BulkIO} with retrying configured
*/
- public Write withRetryConfiguration(RetryConfiguration retryConfiguration)
{
+ public BulkIO withRetryConfiguration(RetryConfiguration
retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is
required");
return builder().setRetryConfiguration(retryConfiguration).build();
}
/**
- * Provide a function to extract the target operation either upsert or
delete from the document
- * fields allowing dynamic bulk operation decision. While using
withIsDeleteFn, it should be
- * taken care that the document's id extraction is defined using the
withIdFn function or else
- * IllegalArgumentException is thrown. Should the function throw an
Exception then the batch
- * will fail and the exception propagated.
+ * Whether or not to suppress version conflict errors in a Bulk API
response. This can be useful
+ * if your use case involves using external version types.
*
- * @param isDeleteFn set to true for deleting the specific document
- * @return the {@link Write} with the function set
+ * @param ignoreVersionConflicts true to suppress version conflicts, false
to surface version
+ * conflict errors.
+ * @return the {@link BulkIO} with version conflict handling configured
*/
- public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
- checkArgument(isDeleteFn != null, "deleteFn is required");
- return builder().setIsDeleteFn(isDeleteFn).build();
+ public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+ Set<String> allowedResponseErrors = getAllowedResponseErrors();
+ if (allowedResponseErrors == null) {
+ allowedResponseErrors = new HashSet<>();
+ }
+ if (ignoreVersionConflicts) {
+ allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+ }
+
+ return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+ }
+
+ /**
+ * Provide a set of textual error types which can be contained in Bulk API
response
+ * items[].error.type field. Any element in @param
allowableResponseErrorTypes will suppress
+ * errors of the same type in Bulk responses.
+ *
+ * <p>See also
+ *
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+ *
+ * @param allowableResponseErrorTypes
+ * @return the {@link BulkIO} with allowable response errors set
+ */
+ public BulkIO withAllowableResponseErrors(@Nullable Set<String>
allowableResponseErrorTypes) {
+ if (allowableResponseErrorTypes == null) {
+ allowableResponseErrorTypes = new HashSet<>();
+ }
+
+ return
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+ }
+
+ /**
+ * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set
a maximum elapsed
+ * time before buffered elements are emitted to Elasticsearch as a Bulk
API request. If this
+ * config is not set, Bulk requests will not be issued until {@link
BulkIO#getMaxBatchSize}
+ * number of documents have been buffered. This may result in higher
latency in particular if
+ * your max batch size is set to a large value and your pipeline input is
low volume.
+ *
+ * @param maxBufferingDuration the maximum duration to wait before sending
any buffered
+ * documents to Elasticsearch, regardless of maxBatchSize.
+ * @return the {@link BulkIO} with maximum buffering duration set
+ */
+ public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+ LOG.warn(
+ "Use of withMaxBufferingDuration requires
withUseStatefulBatches(true). "
+ + "Setting that automatically.");
+ return builder()
+ .setUseStatefulBatches(true)
+ .setMaxBufferingDuration(maxBufferingDuration)
+ .build();
+ }
+
+ /**
+ * Whether or not to use Stateful Processing to ensure bulk requests have
the desired number of
+ * entities i.e. as close to the maxBatchSize as possible. By default
without this feature
+ * enabled, Bulk requests will not contain more than maxBatchSize
entities, but the lower bound
+ * of batch size is determined by Beam Runner bundle sizes, which may be
as few as 1.
+ *
+ * @param useStatefulBatches true enables the use of Stateful Processing
to ensure that batches
+ * are as close to the maxBatchSize as possible.
+ * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+ */
+ public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+ return builder().setUseStatefulBatches(useStatefulBatches).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
+ * can reduce parallelism greatly. If data is globally windowed and @param
+ * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
+ * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
+ * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
+ * less than the number of maximum workers in your pipeline, the IO work
may not be distributed
Review comment:
I would say: it will result in an sub-distribution of the last write
step with most of the runners.
##########
File path: sdks/java/io/elasticsearch/OWNERS
##########
@@ -4,3 +4,4 @@ reviewers:
- echauchot
- jbonofre
- timrobertson100
+ - egalpin
Review comment:
you should put your name first IMHO otherwise very little chance to be
pinged as reviewer
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 591435)
Time Spent: 8h 50m (was: 8h 40m)
> Overhaul ElasticsearchIO#Write
> ------------------------------
>
> Key: BEAM-12093
> URL: https://issues.apache.org/jira/browse/BEAM-12093
> Project: Beam
> Issue Type: Improvement
> Components: io-java-elasticsearch
> Reporter: Evan Galpin
> Assignee: Evan Galpin
> Priority: P2
> Labels: elasticsearch
> Time Spent: 8h 50m
> Remaining Estimate: 0h
>
> The current ElasticsearchIO#Write is great, but there are two related areas
> which could be improved:
> # Separation of concern
> # Bulk API batch size optimization
>
> Presently, the Write transform has 2 responsibilities which are coupled and
> inseparable by users:
> # Convert input documents into Bulk API entities, serializing based on user
> settings (partial update, delete, upsert, etc)
> # Batch the converted Bulk API entities together and interface with the
> target ES cluster
>
> Having these 2 roles tightly coupled means testing requires an available
> Elasticsearch cluster, making unit testing almost impossible. Allowing access
> to the serialized documents would make unit testing much easier for pipeline
> developers, among numerous other benefits to having separation between
> serialization and IO.
> Relatedly, the batching of entities when creating Bulk API payloads is
> currently limited by the lesser of Beam Runner bundling semantics, and the
> `ElasticsearchIO#Write#maxBatchSize` setting. This is understandable for
> portability between runners, but it also means most Bulk payloads only have a
> few (1-5) entities. By using Stateful Processing to better adhere to the
> `ElasticsearchIO#Write#maxBatchSize` setting, we have been able to drop the
> number of indexing requests in an Elasticsearch cluster by 50-100x.
> Separating the role of document serialization and IO allows supporting
> multiple IO techniques with minimal and understandable code.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)