scwhittle commented on code in PR #36170:
URL: https://github.com/apache/beam/pull/36170#discussion_r2353275023


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -115,33 +115,48 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
           logger.debug("Stream was shutdown during send.", 
cancellationException);
           return;
         }
-      }
+        if (delegateStreamObserver == delegate) {
+          if (isCurrentStreamClosed) {
+            logger.debug("Stream is already closed when encountering error 
with send.");
+            return;
+          }
+          isCurrentStreamClosed = true;
+        }
 
-      try {
-        delegate.onError(cancellationException);
-      } catch (IllegalStateException onErrorException) {
-        // The delegate above was already terminated via onError or onComplete.
-        // Fallthrough since this is possibly due to queued onNext() calls 
that are being made from
-        // previously blocked threads.
-      } catch (RuntimeException onErrorException) {
-        logger.warn(
-            "Encountered unexpected error {} when cancelling due to error.",
-            onErrorException,
-            cancellationException);
+        // Either the no longer active observer which we attempt a reset and 
handle errors
+        // or the current observer that still requires closing.
+        try {
+          delegate.onError(cancellationException);

Review Comment:
   I was doing it to be consistent with onError synchronization but I will 
change back since if we want to CP then it seems a riskier change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to