arunpandianp commented on code in PR #36170:
URL: https://github.com/apache/beam/pull/36170#discussion_r2353338287
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -115,33 +115,48 @@ public void onNext(T t) throws StreamClosedException,
WindmillStreamShutdownExce
logger.debug("Stream was shutdown during send.",
cancellationException);
return;
}
- }
+ if (delegateStreamObserver == delegate) {
+ if (isCurrentStreamClosed) {
+ logger.debug("Stream is already closed when encountering error
with send.");
+ return;
+ }
+ isCurrentStreamClosed = true;
+ }
- try {
- delegate.onError(cancellationException);
- } catch (IllegalStateException onErrorException) {
- // The delegate above was already terminated via onError or onComplete.
- // Fallthrough since this is possibly due to queued onNext() calls
that are being made from
- // previously blocked threads.
- } catch (RuntimeException onErrorException) {
- logger.warn(
- "Encountered unexpected error {} when cancelling due to error.",
- onErrorException,
- cancellationException);
+ // Either the no longer active observer which we attempt a reset and
handle errors
+ // or the current observer that still requires closing.
+ try {
+ delegate.onError(cancellationException);
Review Comment:
Thansk, sounds good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]