arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1814061667


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,64 +379,92 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
       batch.addRequest(request);
     }
     if (responsibleForSend) {
-      if (waitForSendLatch == null) {
+      if (prevBatch == null) {
         // If there was not a previous batch wait a little while to improve
         // batching.
-        Thread.sleep(1);
+        sleeper.sleep(1);
       } else {
-        waitForSendLatch.await();
+        prevBatch.waitForSendOrFailNotification();
       }
       // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
       // queue so that a subsequent batch will wait for its completion.
       synchronized (batches) {
-        verify(batch == batches.peekFirst());
+        verify(batch == batches.peekFirst(), "GetDataStream request batch 
removed before send().");
         batch.markFinalized();
       }
-      sendBatch(batch.requests());
+      trySendBatch(batch);
+    } else {
+      // Wait for this batch to be sent before parsing the response.
+      batch.waitForSendOrFailNotification();
+    }
+  }
+
+  void trySendBatch(QueuedBatch batch) {
+    try {
+      sendBatch(batch.sortedRequestsReadOnly());
       synchronized (batches) {

Review Comment:
   changing all `synchronized (batches)` blocks to be like the following should 
fix mutations to `batches` and shutdown racing
   
   ```
   synchronized (shutdownLock) {
     if (isShutdown()) {
       // cleanup anything that needs to be cleaned up;
       return;
       // returning here should be safe as shutDown cleared all existing 
`batches`
     }
     synchronized (batches) {
     ...
     }
   }
   }
   ```
   
   Maybe we could use  only `shutdownLock` to guard access to `batches` and 
remove `synchronized (batches)`, will there be any downsides in doing so?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -91,18 +110,26 @@ void 
addToStreamingGetDataRequest(Windmill.StreamingGetDataRequest.Builder build
     }
   }
 
+  /** Represents a batch of queued requests. Methods are thread-safe unless 
commented otherwise. */
   static class QueuedBatch {
     private final List<QueuedRequest> requests = new ArrayList<>();
     private final CountDownLatch sent = new CountDownLatch(1);
     private long byteSize = 0;
-    private boolean finalized = false;
+    private volatile boolean finalized = false;
+    private volatile boolean failed = false;
 
-    CountDownLatch getLatch() {
-      return sent;
+    /**
+     * Returns a read-only view of requests sorted with {@link 
QueuedRequest#globalRequestsFirst()}.
+     */
+    List<QueuedRequest> sortedRequestsReadOnly() {
+      // Put all global data requests first because there is only a single 
repeated field for
+      // request ids and the initial ids correspond to global data requests if 
they are present.
+      requests.sort(QueuedRequest.globalRequestsFirst());
+      return Collections.unmodifiableList(requests);

Review Comment:
   why do we need this now and not before?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStreamRequests.java:
##########
@@ -91,18 +110,26 @@ void 
addToStreamingGetDataRequest(Windmill.StreamingGetDataRequest.Builder build
     }
   }
 
+  /** Represents a batch of queued requests. Methods are thread-safe unless 
commented otherwise. */

Review Comment:
   shoud this be "Methods are **not** thread-safe unless commented otherwise"?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -283,21 +312,105 @@ private void issueMultiChunkRequest(final long id, 
PendingRequest pendingRequest
     }
   }
 
-  private static class PendingRequest {
+  /**
+   * Returns true if the request should be failed due to stream shutdown, else 
tracks the request to
+   * be sent and returns false.
+   */
+  private boolean isPrepareForSendFailed(long id, PendingRequest request) {

Review Comment:
   nit:
   ```suggestion
     private boolean prepareForSend(long id, PendingRequest request) {
   ```
   
   and return true if prepare succeeded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to