m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1820173305
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,21 +357,74 @@ public String backendWorkerToken() {
}
@Override
- public void shutdown() {
- if (isShutdown.compareAndSet(false, true)) {
- requestObserver()
- .onError(new WindmillStreamShutdownException("Explicit call to
shutdown stream."));
+ public final void shutdown() {
+ // Don't lock on "this" as isShutdown checks are used in the stream to
free blocked
+ // threads or as exit conditions to loops.
+ synchronized (shutdownLock) {
+ if (!isShutdown) {
+ isShutdown = true;
+ shutdownTime.set(DateTime.now());
+ if (started) {
+ // requestObserver is not set until the first startStream() is
called. If the stream was
+ // never started there is nothing to clean up internally.
+ requestObserver.onError(
Review Comment:
done
also moved ResettableRequestObserver to its own file, and added tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]