lukecwik commented on code in PR #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r864108077
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -1997,8 +2012,12 @@ abstract static class Builder {
abstract Builder setUseStatefulBatches(boolean useStatefulBatches);
+ /** @deprecated Use {@link this#setMaxParallelRequests} instead. */
Review Comment:
```suggestion
/** @deprecated Use {@link #setMaxParallelRequests} instead. */
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
Review Comment:
```suggestion
* ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests, but may not work
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
Review Comment:
```suggestion
* runners.
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
Review Comment:
```suggestion
* runners.
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
Review Comment:
```suggestion
* ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests, but may not work
```
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2467,37 +2509,14 @@ private Multimap<BoundedWindow, Document> flushBatch()
createWriteReport(
responseEntity, spec.getAllowedResponseErrors(),
spec.getThrowWriteErrors());
- return mergeInputsAndResponses(inputEntries, responses);
- }
-
- private static Multimap<BoundedWindow, Document> mergeInputsAndResponses(
- List<Entry<BoundedWindow, Document>> inputs, List<Document>
responses) {
-
- checkArgument(
- inputs.size() == responses.size(), "inputs and responses must be
of same size");
-
- Multimap<BoundedWindow, Document> results = ArrayListMultimap.create();
-
- // N.B. the order of responses must always match the order of inputs
- for (int i = 0; i < inputs.size(); i++) {
- BoundedWindow outputWindow = inputs.get(i).getKey();
-
- // Contains raw input document and Bulk directive counterpart only
- Document inputDoc = inputs.get(i).getValue();
-
- // Contains stringified JSON response from Elasticsearch and error
status only
- Document outputDoc = responses.get(i);
-
- // Create a new Document object with all the input fields from
inputDoc (i.e. the raw
- // input JSON string) and all the response fields from ES bulk API
for that input document
- Document merged =
- inputDoc
- .withHasError(outputDoc.getHasError())
- .withResponseItemJson(outputDoc.getResponseItemJson());
- results.put(outputWindow, merged);
- }
-
- return results;
+ return Streams.zip(
Review Comment:
nit: it is rare that Java streams perform better and they typically hurt
readability
##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean
useStatefulBatches) {
/**
* When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
* batches are maintained per-key-per-window. BE AWARE that low values for
@param
- * maxParallelRequestsPerWindow, in particular if the input data has a
finite number of windows,
- * can reduce parallelism greatly. If data is globally windowed and @param
- * maxParallelRequestsPerWindow is set to 1,there will only ever be 1
request in flight. Having
- * only a single request in flight can be beneficial for ensuring an
Elasticsearch cluster is
- * not overwhelmed by parallel requests,but may not work for all use
cases. If this number is
- * less than the number of maximum workers in your pipeline, the IO work
will result in a
- * sub-distribution of the last write step with most of the runners.
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
*
- * @param maxParallelRequestsPerWindow the maximum number of parallel bulk
requests for a window
- * of data
+ * @param maxParallelRequests the maximum number of parallel bulk requests
for a window of data
* @return the {@link BulkIO} with maximum parallel bulk requests per
window set
+ * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
*/
- public BulkIO withMaxParallelRequestsPerWindow(int
maxParallelRequestsPerWindow) {
+ @Deprecated
+ public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
checkArgument(
- maxParallelRequestsPerWindow > 0, "parameter value must be positive
" + "a integer");
- return
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+ maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be
a positive integer");
+ return builder().setMaxParallelRequests(maxParallelRequests).build();
+ }
+
+ /**
+ * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing,
states and therefore
+ * batches are maintained per-key-per-window. BE AWARE that low values for
@param
+ * maxParallelRequests, in particular if the input data has a finite
number of windows, can
+ * reduce parallelism greatly. Because data will be temporarily globally
windowed as part of
+ * writing data to Elasticsearch, if @param maxParallelRequests is set to
1, there will only
+ * ever be 1 request in flight. Having only a single request in flight can
be beneficial for
+ * ensuring an Elasticsearch cluster is not overwhelmed by parallel
requests,but may not work
+ * for all use cases. If this number is less than the number of maximum
workers in your
+ * pipeline, the IO work will result in a sub-optimal distribution of the
write step with most
+ * of the runners.
+ *
+ * @param maxParallelRequests the maximum number of parallel bulk requests
+ * @return the {@link BulkIO} with maximum parallel bulk requests per
window set
Review Comment:
```suggestion
* @return the {@link BulkIO} with maximum parallel bulk requests
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]