m-trieu commented on code in PR #32774: URL: https://github.com/apache/beam/pull/32774#discussion_r1801536245
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -278,24 +347,73 @@ public String backendWorkerToken() { } @Override - public void shutdown() { + public final void shutdown() { + // Don't lock here as isShutdown checks are used in the stream to free blocked + // threads or as exit conditions to loops. if (isShutdown.compareAndSet(false, true)) { requestObserver() .onError(new WindmillStreamShutdownException("Explicit call to shutdown stream.")); + shutdownInternal(); + shutdownTime.set(DateTime.now()); } } - private void setLastError(String error) { - lastError.set(error); - lastErrorTime.set(DateTime.now()); + private void recordRestartReason(String error) { + lastRestartReason.set(error); + lastRestartTime.set(DateTime.now()); } + protected abstract void shutdownInternal(); + public static class WindmillStreamShutdownException extends RuntimeException { public WindmillStreamShutdownException(String message) { super(message); } } + /** + * Request observer that allows resetting its internal delegate using the given {@link + * #requestObserverSupplier}. + */ + @ThreadSafe + private static class ResettableRequestObserver<RequestT> implements StreamObserver<RequestT> { + + private final Supplier<StreamObserver<RequestT>> requestObserverSupplier; + + @GuardedBy("this") + private @Nullable StreamObserver<RequestT> delegateRequestObserver; + + private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) { + this.requestObserverSupplier = requestObserverSupplier; + this.delegateRequestObserver = null; Review Comment: we need some initial state to not allow sends() or other stream operations w/o a call to startStream/start we can use `null` or a dummy observer that will throw if any of the methods are called before startStream/start is called? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org