lukecwik commented on code in PR #17112:
URL: https://github.com/apache/beam/pull/17112#discussion_r864108077


##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -1997,8 +2012,12 @@ abstract static class Builder {
 
       abstract Builder setUseStatefulBatches(boolean useStatefulBatches);
 
+      /** @deprecated Use {@link this#setMaxParallelRequests} instead. */

Review Comment:
   ```suggestion
         /** @deprecated Use {@link #setMaxParallelRequests} instead. */
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work

Review Comment:
   ```suggestion
        * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests, but may not work
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.

Review Comment:
   ```suggestion
        * runners.
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.

Review Comment:
   ```suggestion
        * runners.
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work

Review Comment:
   ```suggestion
        * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests, but may not work
   ```



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2467,37 +2509,14 @@ private Multimap<BoundedWindow, Document> flushBatch()
             createWriteReport(
                 responseEntity, spec.getAllowedResponseErrors(), 
spec.getThrowWriteErrors());
 
-        return mergeInputsAndResponses(inputEntries, responses);
-      }
-
-      private static Multimap<BoundedWindow, Document> mergeInputsAndResponses(
-          List<Entry<BoundedWindow, Document>> inputs, List<Document> 
responses) {
-
-        checkArgument(
-            inputs.size() == responses.size(), "inputs and responses must be 
of same size");
-
-        Multimap<BoundedWindow, Document> results = ArrayListMultimap.create();
-
-        // N.B. the order of responses must always match the order of inputs
-        for (int i = 0; i < inputs.size(); i++) {
-          BoundedWindow outputWindow = inputs.get(i).getKey();
-
-          // Contains raw input document and Bulk directive counterpart only
-          Document inputDoc = inputs.get(i).getValue();
-
-          // Contains stringified JSON response from Elasticsearch and error 
status only
-          Document outputDoc = responses.get(i);
-
-          // Create a new Document object with all the input fields from 
inputDoc (i.e. the raw
-          // input JSON string) and all the response fields from ES bulk API 
for that input document
-          Document merged =
-              inputDoc
-                  .withHasError(outputDoc.getHasError())
-                  .withResponseItemJson(outputDoc.getResponseItemJson());
-          results.put(outputWindow, merged);
-        }
-
-        return results;
+        return Streams.zip(

Review Comment:
   nit: it is rare that Java streams perform better and they typically hurt 
readability 



##########
sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java:
##########
@@ -2152,22 +2171,45 @@ public BulkIO withUseStatefulBatches(boolean 
useStatefulBatches) {
     /**
      * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
      * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
-     * maxParallelRequestsPerWindow, in particular if the input data has a 
finite number of windows,
-     * can reduce parallelism greatly. If data is globally windowed and @param
-     * maxParallelRequestsPerWindow is set to 1,there will only ever be 1 
request in flight. Having
-     * only a single request in flight can be beneficial for ensuring an 
Elasticsearch cluster is
-     * not overwhelmed by parallel requests,but may not work for all use 
cases. If this number is
-     * less than the number of maximum workers in your pipeline, the IO work 
will result in a
-     * sub-distribution of the last write step with most of the runners.
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
      *
-     * @param maxParallelRequestsPerWindow the maximum number of parallel bulk 
requests for a window
-     *     of data
+     * @param maxParallelRequests the maximum number of parallel bulk requests 
for a window of data
      * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set
+     * @deprecated use {@link BulkIO#withMaxParallelRequests} instead.
      */
-    public BulkIO withMaxParallelRequestsPerWindow(int 
maxParallelRequestsPerWindow) {
+    @Deprecated
+    public BulkIO withMaxParallelRequestsPerWindow(int maxParallelRequests) {
       checkArgument(
-          maxParallelRequestsPerWindow > 0, "parameter value must be positive 
" + "a integer");
-      return 
builder().setMaxParallelRequestsPerWindow(maxParallelRequestsPerWindow).build();
+          maxParallelRequests > 0, "maxParallelRequestsPerWindow value must be 
a positive integer");
+      return builder().setMaxParallelRequests(maxParallelRequests).build();
+    }
+
+    /**
+     * When using {@link BulkIO#withUseStatefulBatches} Stateful Processing, 
states and therefore
+     * batches are maintained per-key-per-window. BE AWARE that low values for 
@param
+     * maxParallelRequests, in particular if the input data has a finite 
number of windows, can
+     * reduce parallelism greatly. Because data will be temporarily globally 
windowed as part of
+     * writing data to Elasticsearch, if @param maxParallelRequests is set to 
1, there will only
+     * ever be 1 request in flight. Having only a single request in flight can 
be beneficial for
+     * ensuring an Elasticsearch cluster is not overwhelmed by parallel 
requests,but may not work
+     * for all use cases. If this number is less than the number of maximum 
workers in your
+     * pipeline, the IO work will result in a sub-optimal distribution of the 
write step with most
+     * of the runners.
+     *
+     * @param maxParallelRequests the maximum number of parallel bulk requests
+     * @return the {@link BulkIO} with maximum parallel bulk requests per 
window set

Review Comment:
   ```suggestion
        * @return the {@link BulkIO} with maximum parallel bulk requests
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to