scwhittle commented on code in PR #34367: URL: https://github.com/apache/beam/pull/34367#discussion_r2035672102
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -161,14 +190,15 @@ protected final synchronized boolean trySend(RequestT request) try { requestObserver.onNext(request); return true; - } catch (ResettableThrowingStreamObserver.StreamClosedException e) { + } catch (StreamClosedException e) { // Stream was broken, requests may be retried when stream is reopened. } return false; } @Override + @SuppressWarnings("FutureReturnValueIgnored") Review Comment: remove (maybe obsolete anyway now it looks like). ignoring these has caused issues before ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -333,6 +372,49 @@ public final synchronized void halfClose() { } } + /** + * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED} errors. + * + * @implNote Similar behavior to {@link #halfClose()}, except we allow callers to interact with + * the stream after restarts. + */ + private synchronized void restart() throws WindmillStreamShutdownException { + debugMetrics.recordRestartReason("Internal Timeout"); + try { + requestObserver.release(); Review Comment: I think it might be cleaner if we kept track of the released observer in AbstractWindmillStream because then we could treat it's on completion differently than the active stream. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -333,6 +372,49 @@ public final synchronized void halfClose() { } } + /** + * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED} errors. + * + * @implNote Similar behavior to {@link #halfClose()}, except we allow callers to interact with + * the stream after restarts. + */ + private synchronized void restart() throws WindmillStreamShutdownException { + debugMetrics.recordRestartReason("Internal Timeout"); + try { + requestObserver.release(); + // Create a new stream to flush any pending requests and restart the deadline. If the stream + // is closed or shutdown, don't restart the stream here since a restart has already been + // scheduled. + startStream(); Review Comment: Ideally we wouldn't resend the pending things that were for the stream we are shutting down (unless that stream has errors shutting down and they need retrying). We could change the pending stuff to be kept per-physical-stream instead of just a single set in the sub-implementations. Then when the stream finishes we could move back whatever needs retrying to a pending set for the next new stream to consume. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -103,13 +113,30 @@ protected AbstractWindmillStream( StreamObserverFactory streamObserverFactory, Set<AbstractWindmillStream<?, ?>> streamRegistry, int logEveryNStreamFailures, - String backendWorkerToken) { + String backendWorkerToken, + WindmillStreamTTL streamTTL) { this.backendWorkerToken = backendWorkerToken; this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat(createThreadName(debugStreamType, backendWorkerToken)) + .setNameFormat( + createThreadName(debugStreamType, backendWorkerToken, "WindmillStream")) + .build()); + this.restartExecutor = Review Comment: we should avoid starting executor etc if we're not restarting the streams (ie if ttl is never). That will reduce scope of this change to just the direct path streams setting this. And then separately after baking some we can remove the stream cache used in other cases to do the timeout and instead use this. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java: ########## @@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception { super(s); } } + + static final class InternalStreamTimeout extends Throwable { + private static final InternalStreamTimeout INSTANCE = new InternalStreamTimeout(); + + private InternalStreamTimeout() {} + + static boolean isInternalTimeout(Throwable t) { + while (t != null) { + if (t == INSTANCE) { + return true; + } + t = t.getCause(); + } + return false; + } + } + + private final class AsyncStreamCloser { + private final BlockingQueue<StreamObserver<T>> streamsToClose; + private final ExecutorService streamCloserExecutor; + + @GuardedBy("this") + private boolean started; + + private AsyncStreamCloser() { + streamsToClose = new LinkedBlockingQueue<>(); + streamCloserExecutor = + Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build()); + } + + private synchronized void start() { + if (!started) { + streamCloserExecutor.execute( + () -> { + while (!isPoisoned()) { + try { + timeoutStream(streamsToClose.take()); + } catch (InterruptedException e) { + // Drain streamsToClose to prevent any dangling StreamObservers. + streamsToClose.forEach(this::timeoutStream); + break; + } + } + }); + started = true; + } + } + + private void timeoutStream(StreamObserver<T> streamObserver) { + try { + streamObserver.onError(InternalStreamTimeout.INSTANCE); Review Comment: On timeout we should try to close cleanly instead of with an error. That will be better for monitoring if we can avoid errors in the common cases but also avoids wasting work (like reads that windmill is performing which responses we could consume instead of retrying the read). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org