scwhittle commented on code in PR #34367:
URL: https://github.com/apache/beam/pull/34367#discussion_r2035672102
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -161,14 +190,15 @@ protected final synchronized boolean trySend(RequestT
request)
try {
requestObserver.onNext(request);
return true;
- } catch (ResettableThrowingStreamObserver.StreamClosedException e) {
+ } catch (StreamClosedException e) {
// Stream was broken, requests may be retried when stream is reopened.
}
return false;
}
@Override
+ @SuppressWarnings("FutureReturnValueIgnored")
Review Comment:
remove (maybe obsolete anyway now it looks like). ignoring these has caused
issues before
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -333,6 +372,49 @@ public final synchronized void halfClose() {
}
}
+ /**
+ * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED}
errors.
+ *
+ * @implNote Similar behavior to {@link #halfClose()}, except we allow
callers to interact with
+ * the stream after restarts.
+ */
+ private synchronized void restart() throws WindmillStreamShutdownException {
+ debugMetrics.recordRestartReason("Internal Timeout");
+ try {
+ requestObserver.release();
Review Comment:
I think it might be cleaner if we kept track of the released observer in
AbstractWindmillStream because then we could treat it's on completion
differently than the active stream.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -333,6 +372,49 @@ public final synchronized void halfClose() {
}
}
+ /**
+ * Internally restart the stream to avoid {@link Status#DEADLINE_EXCEEDED}
errors.
+ *
+ * @implNote Similar behavior to {@link #halfClose()}, except we allow
callers to interact with
+ * the stream after restarts.
+ */
+ private synchronized void restart() throws WindmillStreamShutdownException {
+ debugMetrics.recordRestartReason("Internal Timeout");
+ try {
+ requestObserver.release();
+ // Create a new stream to flush any pending requests and restart the
deadline. If the stream
+ // is closed or shutdown, don't restart the stream here since a restart
has already been
+ // scheduled.
+ startStream();
Review Comment:
Ideally we wouldn't resend the pending things that were for the stream we
are shutting down (unless that stream has errors shutting down and they need
retrying).
We could change the pending stuff to be kept per-physical-stream instead of
just a single set in the sub-implementations. Then when the stream finishes we
could move back whatever needs retrying to a pending set for the next new
stream to consume.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -103,13 +113,30 @@ protected AbstractWindmillStream(
StreamObserverFactory streamObserverFactory,
Set<AbstractWindmillStream<?, ?>> streamRegistry,
int logEveryNStreamFailures,
- String backendWorkerToken) {
+ String backendWorkerToken,
+ WindmillStreamTTL streamTTL) {
this.backendWorkerToken = backendWorkerToken;
this.executor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
- .setNameFormat(createThreadName(debugStreamType,
backendWorkerToken))
+ .setNameFormat(
+ createThreadName(debugStreamType, backendWorkerToken,
"WindmillStream"))
+ .build());
+ this.restartExecutor =
Review Comment:
we should avoid starting executor etc if we're not restarting the streams
(ie if ttl is never). That will reduce scope of this change to just the direct
path streams setting this.
And then separately after baking some we can remove the stream cache used in
other cases to do the timeout and instead use this.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -159,4 +180,76 @@ static final class StreamClosedException extends Exception
{
super(s);
}
}
+
+ static final class InternalStreamTimeout extends Throwable {
+ private static final InternalStreamTimeout INSTANCE = new
InternalStreamTimeout();
+
+ private InternalStreamTimeout() {}
+
+ static boolean isInternalTimeout(Throwable t) {
+ while (t != null) {
+ if (t == INSTANCE) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
+ }
+
+ private final class AsyncStreamCloser {
+ private final BlockingQueue<StreamObserver<T>> streamsToClose;
+ private final ExecutorService streamCloserExecutor;
+
+ @GuardedBy("this")
+ private boolean started;
+
+ private AsyncStreamCloser() {
+ streamsToClose = new LinkedBlockingQueue<>();
+ streamCloserExecutor =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("StreamCloserThread-%d").build());
+ }
+
+ private synchronized void start() {
+ if (!started) {
+ streamCloserExecutor.execute(
+ () -> {
+ while (!isPoisoned()) {
+ try {
+ timeoutStream(streamsToClose.take());
+ } catch (InterruptedException e) {
+ // Drain streamsToClose to prevent any dangling
StreamObservers.
+ streamsToClose.forEach(this::timeoutStream);
+ break;
+ }
+ }
+ });
+ started = true;
+ }
+ }
+
+ private void timeoutStream(StreamObserver<T> streamObserver) {
+ try {
+ streamObserver.onError(InternalStreamTimeout.INSTANCE);
Review Comment:
On timeout we should try to close cleanly instead of with an error. That
will be better for monitoring if we can avoid errors in the common cases but
also avoids wasting work (like reads that windmill is performing which
responses we could consume instead of retrying the read).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]