m-trieu commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2073738102
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ########## @@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception { super(s); } } + + static final class InternalStreamTimeout extends Throwable { + private static final InternalStreamTimeout INSTANCE = new InternalStreamTimeout(); + + private InternalStreamTimeout() {} + + static boolean isInternalTimeout(Throwable t) { + while (t != null) { + if (t == INSTANCE) { + return true; + } + t = t.getCause(); + } + return false; + } + } + + private final class AsyncStreamCloser { + private final BlockingQueue<StreamObserver<T>> streamsToClose; + private final ExecutorService streamCloserExecutor; + + @GuardedBy("this") + private boolean started; + + private AsyncStreamCloser() { + streamsToClose = new LinkedBlockingQueue<>(); + streamCloserExecutor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build()); + } + + private synchronized void start() { + if (!started) { + streamCloserExecutor.execute( + () -> { + while (!isPoisoned()) { + try { + timeoutStream(streamsToClose.take()); + } catch (InterruptedException e) { + // Drain streamsToClose to prevent any dangling StreamObservers. + streamsToClose.forEach(this::timeoutStream); + break; + } + } + }); + started = true; + } + } + + private void timeoutStream(StreamObserver<T> streamObserver) { + try { + streamObserver.onError(InternalStreamTimeout.INSTANCE); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org