scwhittle commented on code in PR #35327:
URL: https://github.com/apache/beam/pull/35327#discussion_r2178171036


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -139,62 +159,83 @@ public CommitWorkStream.RequestBatcher batcher() {
   }
 
   @Override
-  protected boolean hasPendingRequests() {
-    return !pending.isEmpty();
-  }
-
-  @Override
-  protected void sendHealthCheck() throws WindmillStreamShutdownException {
-    if (hasPendingRequests()) {
+  protected synchronized void sendHealthCheck() throws 
WindmillStreamShutdownException {
+    if (currentPhysicalStream != null && 
currentPhysicalStream.hasPendingRequests()) {
       StreamingCommitWorkRequest.Builder builder = 
StreamingCommitWorkRequest.newBuilder();
       builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
       trySend(builder.build());
     }
   }
 
-  @Override
-  protected void onResponse(StreamingCommitResponse response) {
-    CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
-    for (int i = 0; i < response.getRequestIdCount(); ++i) {
-      long requestId = response.getRequestId(i);
-      if (requestId == HEARTBEAT_REQUEST_ID) {
-        continue;
-      }
+  private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler {
+    @Override
+    public void onResponse(StreamingCommitResponse response) {
+      CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
+      for (int i = 0; i < response.getRequestIdCount(); ++i) {
+        long requestId = response.getRequestId(i);
+        if (requestId == HEARTBEAT_REQUEST_ID) {
+          continue;
+        }
+
+        // From windmill.proto: Indices must line up with the request_id 
field, but trailing OKs
+        // may be omitted.
+        CommitStatus commitStatus =
+            i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
 
-      // From windmill.proto: Indices must line up with the request_id field, 
but trailing OKs may
-      // be omitted.
-      CommitStatus commitStatus =
-          i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
-
-      @Nullable PendingRequest pendingRequest = pending.remove(requestId);
-      if (pendingRequest == null) {
-        synchronized (this) {
-          if (!isShutdown) {
-            // Missing responses are expected after shutdown() because it 
removes them.
-            LOG.error("Got unknown commit request ID: {}", requestId);
-          }
+        @Nullable StreamAndRequest entry = pending.remove(requestId);
+        if (entry == null) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+          continue;
         }
-      } else {
+        if (entry.handler != this) {
+          LOG.error("Got commit request id {} on unexpected stream", 
requestId);
+        }
+        PendingRequest pendingRequest = entry.request;
         try {
           pendingRequest.completeWithStatus(commitStatus);
         } catch (RuntimeException e) {
-          // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
+          // Catch possible exceptions to ensure that an exception for one 
commit does not
+          // prevent

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to