arunpandianp commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1808292032
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -273,10 +288,24 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp @Override public void sendHealthCheck() { if (hasPendingRequests()) { - send(StreamingGetDataRequest.newBuilder().build()); + send(HEALTH_CHECK_REQUEST); } } + @Override + protected void shutdownInternal() { + // Stream has been explicitly closed. Drain pending input streams and request batches. + // Future calls to send RPCs will fail. + pending.values().forEach(AppendableInputStream::cancel); Review Comment: > I can add a todo to improve the locking mechanisms for AbstractWIndmillStream? Since it is a problem introduced by the new code, I think we should fix it before merging. Bugs here will also affect cloud path, want to make sure we don't break anything on cloud path. Can we guard all logic (IIUC, there shouldn't be any blocking IO) that depends on shutdown flag with the shutdown mutex? If we guard `pending.put` and the isShutdown check with the shutdown mutex, we can avoid this race. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org