m-trieu commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2073738102


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception 
{
       super(s);
     }
   }
+
+  static final class InternalStreamTimeout extends Throwable {
+    private static final InternalStreamTimeout INSTANCE = new 
InternalStreamTimeout();
+
+    private InternalStreamTimeout() {}
+
+    static boolean isInternalTimeout(Throwable t) {
+      while (t != null) {
+        if (t == INSTANCE) {
+          return true;
+        }
+        t = t.getCause();
+      }
+      return false;
+    }
+  }
+
+  private final class AsyncStreamCloser {
+    private final BlockingQueue<StreamObserver<T>> streamsToClose;
+    private final ExecutorService streamCloserExecutor;
+
+    @GuardedBy("this")
+    private boolean started;
+
+    private AsyncStreamCloser() {
+      streamsToClose = new LinkedBlockingQueue<>();
+      streamCloserExecutor =
+          Executors.newSingleThreadExecutor(
+              new 
ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build());
+    }
+
+    private synchronized void start() {
+      if (!started) {
+        streamCloserExecutor.execute(
+            () -> {
+              while (!isPoisoned()) {
+                try {
+                  timeoutStream(streamsToClose.take());
+                } catch (InterruptedException e) {
+                  // Drain streamsToClose to prevent any dangling 
StreamObservers.
+                  streamsToClose.forEach(this::timeoutStream);
+                  break;
+                }
+              }
+            });
+        started = true;
+      }
+    }
+
+    private void timeoutStream(StreamObserver<T> streamObserver) {
+      try {
+        streamObserver.onError(InternalStreamTimeout.INSTANCE);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to