egalpin commented on a change in pull request #14347:
URL: https://github.com/apache/beam/pull/14347#discussion_r616962376
##########
File path:
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
##########
@@ -1203,88 +1796,160 @@ public Write withUsePartialUpdate(boolean
usePartialUpdate) {
* }</pre>
*
* @param retryConfiguration the rules which govern the retry behavior
- * @return the {@link Write} with retrying configured
+ * @return the {@link BulkIO} with retrying configured
*/
- public Write withRetryConfiguration(RetryConfiguration retryConfiguration)
{
+ public BulkIO withRetryConfiguration(RetryConfiguration
retryConfiguration) {
checkArgument(retryConfiguration != null, "retryConfiguration is
required");
return builder().setRetryConfiguration(retryConfiguration).build();
}
/**
- * Provide a function to extract the target operation either upsert or
delete from the document
- * fields allowing dynamic bulk operation decision. While using
withIsDeleteFn, it should be
- * taken care that the document's id extraction is defined using the
withIdFn function or else
- * IllegalArgumentException is thrown. Should the function throw an
Exception then the batch
- * will fail and the exception propagated.
+ * Whether or not to suppress version conflict errors in a Bulk API
response. This can be useful
+ * if your use case involves using external version types.
*
- * @param isDeleteFn set to true for deleting the specific document
- * @return the {@link Write} with the function set
+ * @param ignoreVersionConflicts true to suppress version conflicts, false
to surface version
+ * conflict errors.
+ * @return the {@link BulkIO} with version conflict handling configured
*/
- public Write withIsDeleteFn(BooleanFieldValueExtractFn isDeleteFn) {
- checkArgument(isDeleteFn != null, "deleteFn is required");
- return builder().setIsDeleteFn(isDeleteFn).build();
+ public BulkIO withIgnoreVersionConflicts(boolean ignoreVersionConflicts) {
+ Set<String> allowedResponseErrors = getAllowedResponseErrors();
+ if (allowedResponseErrors == null) {
+ allowedResponseErrors = new HashSet<>();
+ }
+ if (ignoreVersionConflicts) {
+ allowedResponseErrors.add(VERSION_CONFLICT_ERROR);
+ }
+
+ return builder().setAllowedResponseErrors(allowedResponseErrors).build();
+ }
+
+ /**
+ * Provide a set of textual error types which can be contained in Bulk API
response
+ * items[].error.type field. Any element in @param
allowableResponseErrorTypes will suppress
+ * errors of the same type in Bulk responses.
+ *
+ * <p>See also
+ *
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-failures-ex
+ *
+ * @param allowableResponseErrorTypes
+ * @return the {@link BulkIO} with allowable response errors set
+ */
+ public BulkIO withAllowableResponseErrors(@Nullable Set<String>
allowableResponseErrorTypes) {
+ if (allowableResponseErrorTypes == null) {
+ allowableResponseErrorTypes = new HashSet<>();
+ }
+
+ return
builder().setAllowedResponseErrors(allowableResponseErrorTypes).build();
+ }
+
+ /**
+ * If using {@link BulkIO#withUseStatefulBatches}, this can be used to set
a maximum elapsed
+ * time before buffered elements are emitted to Elasticsearch as a Bulk
API request. If this
+ * config is not set, Bulk requests will not be issued until {@link
BulkIO#getMaxBatchSize}
+ * number of documents have been buffered. This may result in higher
latency in particular if
+ * your max batch size is set to a large value and your pipeline input is
low volume.
+ *
+ * @param maxBufferingDuration the maximum duration to wait before sending
any buffered
+ * documents to Elasticsearch, regardless of maxBatchSize.
+ * @return the {@link BulkIO} with maximum buffering duration set
+ */
+ public BulkIO withMaxBufferingDuration(Duration maxBufferingDuration) {
+ LOG.warn(
+ "Use of withMaxBufferingDuration requires
withUseStatefulBatches(true). "
+ + "Setting that automatically.");
+ return builder()
+ .setUseStatefulBatches(true)
+ .setMaxBufferingDuration(maxBufferingDuration)
+ .build();
+ }
+
+ /**
+ * Whether or not to use Stateful Processing to ensure bulk requests have
the desired number of
+ * entities i.e. as close to the maxBatchSize as possible. By default
without this feature
+ * enabled, Bulk requests will not contain more than maxBatchSize
entities, but the lower bound
+ * of batch size is determined by Beam Runner bundle sizes, which may be
as few as 1.
+ *
+ * @param useStatefulBatches true enables the use of Stateful Processing
to ensure that batches
+ * are as close to the maxBatchSize as possible.
+ * @return the {@link BulkIO} with Stateful Processing enabled or disabled
+ */
+ public BulkIO withUseStatefulBatches(boolean useStatefulBatches) {
+ return builder().setUseStatefulBatches(useStatefulBatches).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. If data is globally windowed
and this
+ * configuration is set to 1, there will only ever be 1 request in flight.
Having only a single
+ * request in flight can be beneficial for ensuring an Elasticsearch
cluster is not overwhelmed
+ * by parallel requests, but may not work for all use cases. If this
number is less than the
+ * number of maximum workers in your pipeline, the IO work may not be
distributed across all
+ * workers.
+ *
+ * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
+ * of data
+ * @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ */
+ public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ checkArgument(
+ maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
+ return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
}
@Override
public PDone expand(PCollection<String> input) {
ConnectionConfiguration connectionConfiguration =
getConnectionConfiguration();
- FieldValueExtractFn idFn = getIdFn();
- BooleanFieldValueExtractFn isDeleteFn = getIsDeleteFn();
checkState(connectionConfiguration != null,
"withConnectionConfiguration() is required");
- checkArgument(
- isDeleteFn == null || idFn != null,
- "Id needs to be specified by withIdFn for delete operation");
- input.apply(ParDo.of(new WriteFn(this)));
+
+ if (getUseStatefulBatches()) {
+ GroupIntoBatches<Integer, String> groupIntoBatches =
+ GroupIntoBatches.ofSize(getMaxBatchSize());
+
+ if (getMaxBufferingDuration() != null) {
+ groupIntoBatches =
groupIntoBatches.withMaxBufferingDuration(getMaxBufferingDuration());
+ }
+ input
+ .apply(ParDo.of(new
AssignShardFn<>(getMaxParallelRequestsPerWindow())))
+ .apply(groupIntoBatches)
+ .apply(
+ "Remove key no longer needed",
+
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))
+ .via(KV::getValue))
+ .apply(ParDo.of(new BulkIOFn(this)));
+ } else {
+
+ input
+ .apply(
+ "Make elements iterable",
+
MapElements.into(TypeDescriptors.iterables(TypeDescriptors.strings()))
+ .via(Collections::singletonList))
+ .apply(ParDo.of(new BulkIOFn(this)));
+ }
+
return PDone.in(input.getPipeline());
Review comment:
In cases where a user were to use separate DocToBulk and BulkIO
transforms, would we still need this PDone? In that case, maybe I should remove
the PDone from `Write` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]