arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1808292032


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -273,10 +288,24 @@ public void 
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
   @Override
   public void sendHealthCheck() {
     if (hasPendingRequests()) {
-      send(StreamingGetDataRequest.newBuilder().build());
+      send(HEALTH_CHECK_REQUEST);
     }
   }
 
+  @Override
+  protected void shutdownInternal() {
+    // Stream has been explicitly closed. Drain pending input streams and 
request batches.
+    // Future calls to send RPCs will fail.
+    pending.values().forEach(AppendableInputStream::cancel);

Review Comment:
   > I can add a todo to improve the locking mechanisms for 
AbstractWIndmillStream?
   
   Since it is a problem introduced by the new code, I think we should fix it 
before merging. Bugs here will also affect cloud path, want to make sure we 
don't break anything on cloud path.
   
   Can we guard all logic (IIUC, there shouldn't be any blocking IO) that 
depends on shutdown flag with the shutdown mutex? If we guard `pending.put` and 
the isShutdown check with the shutdown mutex, we can avoid this race.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to