m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1806013691


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -273,10 +288,24 @@ public void 
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
   @Override
   public void sendHealthCheck() {
     if (hasPendingRequests()) {
-      send(StreamingGetDataRequest.newBuilder().build());
+      send(HEALTH_CHECK_REQUEST);
     }
   }
 
+  @Override
+  protected void shutdownInternal() {
+    // Stream has been explicitly closed. Drain pending input streams and 
request batches.
+    // Future calls to send RPCs will fail.
+    pending.values().forEach(AppendableInputStream::cancel);

Review Comment:
   we do a lot of synchronization on `this` and cannot have it block shutdown 
since this will cause stuckness.  The retry logic for sends on stream restarts, 
keeping sends serial, etc depends on this synchronization.
   
   I can add a `todo` to improve the locking mechanisms for 
AbstractWIndmillStream?
   
   the shutdown signal is only propagated, how about we add a finally check 
after the send to check for shutdown and clear pending.  This is guaranteed to 
only be called once during the lifetime of the stream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to