scwhittle commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2035672102


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -161,14 +190,15 @@ protected final synchronized boolean trySend(RequestT 
request)
     try {
       requestObserver.onNext(request);
       return true;
-    } catch (ResettableThrowingStreamObserver.StreamClosedException e) {
+    } catch (StreamClosedException e) {
       // Stream was broken, requests may be retried when stream is reopened.
     }
 
     return false;
   }
 
   @Override
+  @SuppressWarnings("FutureReturnValueIgnored")

Review Comment:
   remove (maybe obsolete anyway now it looks like). ignoring these has caused 
issues before



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -333,6 +372,49 @@ public final synchronized void halfClose() {
     }
   }
 
+  /**
+   * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED} 
errors.
+   *
+   * @implNote Similar behavior to {@link #halfClose()}, except we allow 
callers to interact with
+   *     the stream after restarts.
+   */
+  private synchronized void restart() throws WindmillStreamShutdownException {
+    debugMetrics.recordRestartReason("Internal Timeout");
+    try {
+      requestObserver.release();

Review Comment:
   I think it might be cleaner if we kept track of the released observer in 
AbstractWindmillStream because then we could treat it's on completion 
differently than the active stream.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -333,6 +372,49 @@ public final synchronized void halfClose() {
     }
   }
 
+  /**
+   * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED} 
errors.
+   *
+   * @implNote Similar behavior to {@link #halfClose()}, except we allow 
callers to interact with
+   *     the stream after restarts.
+   */
+  private synchronized void restart() throws WindmillStreamShutdownException {
+    debugMetrics.recordRestartReason("Internal Timeout");
+    try {
+      requestObserver.release();
+      // Create a new stream to flush any pending requests and restart the 
deadline. If the stream
+      // is closed or shutdown, don't restart the stream here since a restart 
has already been
+      // scheduled.
+      startStream();

Review Comment:
   Ideally we wouldn't resend the pending things that were for the stream we 
are shutting down (unless that stream has errors shutting down and they need 
retrying).
   
   We could change the pending stuff to be kept per-physical-stream instead of 
just a single set in the sub-implementations.  Then when the stream finishes we 
could move back whatever needs retrying to a pending set for the next new 
stream to consume.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -103,13 +113,30 @@ protected AbstractWindmillStream(
       StreamObserverFactory streamObserverFactory,
       Set<AbstractWindmillStream<?, ?>> streamRegistry,
       int logEveryNStreamFailures,
-      String backendWorkerToken) {
+      String backendWorkerToken,
+      WindmillStreamTTL streamTTL) {
     this.backendWorkerToken = backendWorkerToken;
     this.executor =
         Executors.newSingleThreadExecutor(
             new ThreadFactoryBuilder()
                 .setDaemon(true)
-                .setNameFormat(createThreadName(debugStreamType, 
backendWorkerToken))
+                .setNameFormat(
+                    createThreadName(debugStreamType, backendWorkerToken, 
"WindmillStream"))
+                .build());
+    this.restartExecutor =

Review Comment:
   we should avoid starting executor etc if we're not restarting the streams 
(ie if ttl is never).  That will reduce scope of this change to just the direct 
path streams setting this.  
   
   And then separately after baking some we can remove the stream cache used in 
other cases to do the timeout and instead use this.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception 
{
       super(s);
     }
   }
+
+  static final class InternalStreamTimeout extends Throwable {
+    private static final InternalStreamTimeout INSTANCE = new 
InternalStreamTimeout();
+
+    private InternalStreamTimeout() {}
+
+    static boolean isInternalTimeout(Throwable t) {
+      while (t != null) {
+        if (t == INSTANCE) {
+          return true;
+        }
+        t = t.getCause();
+      }
+      return false;
+    }
+  }
+
+  private final class AsyncStreamCloser {
+    private final BlockingQueue<StreamObserver<T>> streamsToClose;
+    private final ExecutorService streamCloserExecutor;
+
+    @GuardedBy("this")
+    private boolean started;
+
+    private AsyncStreamCloser() {
+      streamsToClose = new LinkedBlockingQueue<>();
+      streamCloserExecutor =
+          Executors.newSingleThreadExecutor(
+              new 
ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build());
+    }
+
+    private synchronized void start() {
+      if (!started) {
+        streamCloserExecutor.execute(
+            () -> {
+              while (!isPoisoned()) {
+                try {
+                  timeoutStream(streamsToClose.take());
+                } catch (InterruptedException e) {
+                  // Drain streamsToClose to prevent any dangling 
StreamObservers.
+                  streamsToClose.forEach(this::timeoutStream);
+                  break;
+                }
+              }
+            });
+        started = true;
+      }
+    }
+
+    private void timeoutStream(StreamObserver<T> streamObserver) {
+      try {
+        streamObserver.onError(InternalStreamTimeout.INSTANCE);

Review Comment:
   On timeout we should try to close cleanly instead of with an error.  That 
will be better for monitoring if we can avoid errors in the common cases but 
also avoids wasting work (like reads that windmill is performing which 
responses we could consume instead of retrying the read).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to