m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1801536245


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,24 +347,73 @@ public String backendWorkerToken() {
   }
 
   @Override
-  public void shutdown() {
+  public final void shutdown() {
+    // Don't lock here as isShutdown checks are used in the stream to free 
blocked
+    // threads or as exit conditions to loops.
     if (isShutdown.compareAndSet(false, true)) {
       requestObserver()
           .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+      shutdownInternal();
+      shutdownTime.set(DateTime.now());
     }
   }
 
-  private void setLastError(String error) {
-    lastError.set(error);
-    lastErrorTime.set(DateTime.now());
+  private void recordRestartReason(String error) {
+    lastRestartReason.set(error);
+    lastRestartTime.set(DateTime.now());
   }
 
+  protected abstract void shutdownInternal();
+
   public static class WindmillStreamShutdownException extends RuntimeException 
{
     public WindmillStreamShutdownException(String message) {
       super(message);
     }
   }
 
+  /**
+   * Request observer that allows resetting its internal delegate using the 
given {@link
+   * #requestObserverSupplier}.
+   */
+  @ThreadSafe
+  private static class ResettableRequestObserver<RequestT> implements 
StreamObserver<RequestT> {
+
+    private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
+
+    @GuardedBy("this")
+    private @Nullable StreamObserver<RequestT> delegateRequestObserver;
+
+    private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> 
requestObserverSupplier) {
+      this.requestObserverSupplier = requestObserverSupplier;
+      this.delegateRequestObserver = null;

Review Comment:
   we need some initial state to not allow sends() or other stream operations 
w/o a call to startStream/start
   
   we can use `null` or a dummy observer that will throw if any of the methods 
are called before startStream/start is called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to