scwhittle commented on code in PR #36170: URL: https://github.com/apache/beam/pull/36170#discussion_r2353275023
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ########## @@ -115,33 +115,48 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce logger.debug("Stream was shutdown during send.", cancellationException); return; } - } + if (delegateStreamObserver == delegate) { + if (isCurrentStreamClosed) { + logger.debug("Stream is already closed when encountering error with send."); + return; + } + isCurrentStreamClosed = true; + } - try { - delegate.onError(cancellationException); - } catch (IllegalStateException onErrorException) { - // The delegate above was already terminated via onError or onComplete. - // Fallthrough since this is possibly due to queued onNext() calls that are being made from - // previously blocked threads. - } catch (RuntimeException onErrorException) { - logger.warn( - "Encountered unexpected error {} when cancelling due to error.", - onErrorException, - cancellationException); + // Either the no longer active observer which we attempt a reset and handle errors + // or the current observer that still requires closing. + try { + delegate.onError(cancellationException); Review Comment: I was doing it to be consistent with onError synchronization but I will change back since if we want to CP then it seems a riskier change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org