arunpandianp commented on code in PR #36170:
URL: https://github.com/apache/beam/pull/36170#discussion_r2353338287


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -115,33 +115,48 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
           logger.debug("Stream was shutdown during send.", 
cancellationException);
           return;
         }
-      }
+        if (delegateStreamObserver == delegate) {
+          if (isCurrentStreamClosed) {
+            logger.debug("Stream is already closed when encountering error 
with send.");
+            return;
+          }
+          isCurrentStreamClosed = true;
+        }
 
-      try {
-        delegate.onError(cancellationException);
-      } catch (IllegalStateException onErrorException) {
-        // The delegate above was already terminated via onError or onComplete.
-        // Fallthrough since this is possibly due to queued onNext() calls 
that are being made from
-        // previously blocked threads.
-      } catch (RuntimeException onErrorException) {
-        logger.warn(
-            "Encountered unexpected error {} when cancelling due to error.",
-            onErrorException,
-            cancellationException);
+        // Either the no longer active observer which we attempt a reset and 
handle errors
+        // or the current observer that still requires closing.
+        try {
+          delegate.onError(cancellationException);

Review Comment:
   Thansk, sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to