m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1806013691
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -273,10 +288,24 @@ public void
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
@Override
public void sendHealthCheck() {
if (hasPendingRequests()) {
- send(StreamingGetDataRequest.newBuilder().build());
+ send(HEALTH_CHECK_REQUEST);
}
}
+ @Override
+ protected void shutdownInternal() {
+ // Stream has been explicitly closed. Drain pending input streams and
request batches.
+ // Future calls to send RPCs will fail.
+ pending.values().forEach(AppendableInputStream::cancel);
Review Comment:
we do a lot of synchronization on `this` and cannot have it block shutdown
since this will cause stuckness. The retry logic for sends on stream restarts,
keeping sends serial, etc depends on this synchronization.
I can add a `todo` to improve the locking mechanisms for
AbstractWIndmillStream?
the shutdown signal is only propagated, how about we add a finally check
after the send to check for shutdown and clear pending. This is guaranteed to
only be called once during the lifetime of the stream
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]