arunpandianp commented on code in PR #35327:
URL: https://github.com/apache/beam/pull/35327#discussion_r2176736555


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -230,10 +271,13 @@ private void issueSingleRequest(long id, PendingRequest 
pendingRequest)
         .setSerializedWorkItemCommit(pendingRequest.serializedCommit());
     StreamingCommitWorkRequest chunk = requestBuilder.build();
     synchronized (this) {
-      if (!prepareForSend(id, pendingRequest)) {
+      if (isShutdown) {
         pendingRequest.abort();

Review Comment:
   should we return here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -139,62 +159,83 @@ public CommitWorkStream.RequestBatcher batcher() {
   }
 
   @Override
-  protected boolean hasPendingRequests() {
-    return !pending.isEmpty();
-  }
-
-  @Override
-  protected void sendHealthCheck() throws WindmillStreamShutdownException {
-    if (hasPendingRequests()) {
+  protected synchronized void sendHealthCheck() throws 
WindmillStreamShutdownException {
+    if (currentPhysicalStream != null && 
currentPhysicalStream.hasPendingRequests()) {
       StreamingCommitWorkRequest.Builder builder = 
StreamingCommitWorkRequest.newBuilder();
       builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
       trySend(builder.build());
     }
   }
 
-  @Override
-  protected void onResponse(StreamingCommitResponse response) {
-    CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
-    for (int i = 0; i < response.getRequestIdCount(); ++i) {
-      long requestId = response.getRequestId(i);
-      if (requestId == HEARTBEAT_REQUEST_ID) {
-        continue;
-      }
+  private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler {
+    @Override
+    public void onResponse(StreamingCommitResponse response) {
+      CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
+      for (int i = 0; i < response.getRequestIdCount(); ++i) {
+        long requestId = response.getRequestId(i);
+        if (requestId == HEARTBEAT_REQUEST_ID) {
+          continue;
+        }
+
+        // From windmill.proto: Indices must line up with the request_id 
field, but trailing OKs
+        // may be omitted.
+        CommitStatus commitStatus =
+            i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
 
-      // From windmill.proto: Indices must line up with the request_id field, 
but trailing OKs may
-      // be omitted.
-      CommitStatus commitStatus =
-          i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
-
-      @Nullable PendingRequest pendingRequest = pending.remove(requestId);
-      if (pendingRequest == null) {
-        synchronized (this) {
-          if (!isShutdown) {
-            // Missing responses are expected after shutdown() because it 
removes them.
-            LOG.error("Got unknown commit request ID: {}", requestId);
-          }
+        @Nullable StreamAndRequest entry = pending.remove(requestId);
+        if (entry == null) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+          continue;
         }
-      } else {
+        if (entry.handler != this) {
+          LOG.error("Got commit request id {} on unexpected stream", 
requestId);
+        }
+        PendingRequest pendingRequest = entry.request;
         try {
           pendingRequest.completeWithStatus(commitStatus);
         } catch (RuntimeException e) {
-          // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
+          // Catch possible exceptions to ensure that an exception for one 
commit does not
+          // prevent

Review Comment:
   need to fix format



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -284,8 +374,8 @@ public void 
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
   }
 
   @Override
-  protected void sendHealthCheck() throws WindmillStreamShutdownException {
-    if (hasPendingRequests()) {
+  protected synchronized void sendHealthCheck() throws 
WindmillStreamShutdownException {
+    if (currentPhysicalStream != null && 
currentPhysicalStream.hasPendingRequests()) {
       trySend(HEALTH_CHECK_REQUEST);

Review Comment:
   not related to new code: will it be better to send health checks even when 
there are no pending requests? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to