scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1830671651


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+    return isShutdown;
   }
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+      throws StreamClosedException, WindmillStreamShutdownException {
+    debugMetrics.recordSend();
+    requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
     synchronized (this) {
-      if (streamClosed.get()) {
-        throw new IllegalStateException("Send called on a client closed 
stream.");
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
       }
+    }
 
-      requestObserver().onNext(request);
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          debugMetrics.recordStart();
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown while creating new stream.", e);

Review Comment:
   // shutdown() is responsible for cleaning up pending requests.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,52 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  private static final String NEVER_RECEIVED_RESPONSE_LOG_STRING = "never 
received response";
+  protected final Sleeper sleeper;
+
+  private final Logger logger;
+  private final ExecutorService executor;
   private final BackOff backoff;
-  private final AtomicLong startTimeMs;
-  private final AtomicLong lastResponseTimeMs;
-  private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
-  private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableThrowingStreamObserver<RequestT> requestObserver;
+  private final StreamDebugMetrics debugMetrics;
+  protected volatile boolean clientClosed;

Review Comment:
   can this be non-volatile guardedby now? think it's simpler if we just hold 
synchronized block for sendHeartbeats etc instead of dealing with volatile and 
interleaving with other stuff under synchronized blocks



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {

Review Comment:
   maybe remove this method that it is single synchronize object?  I'm guessing 
most cases will have synchronization already (or perhaps should be changed to 
keeping it longer than just checking the bool?)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
       outboundObserver.onCompleted();
     }
   }
+
+  @Override
+  public void terminate(Throwable terminationException) {
+    // Free the blocked threads in onNext().
+    isReadyNotifier.forceTermination();

Review Comment:
   i think we might as well do this cancellation in the case onError is called 
as well



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
       outboundObserver.onCompleted();
     }
   }
+
+  @Override
+  public void terminate(Throwable terminationException) {
+    // Free the blocked threads in onNext().
+    isReadyNotifier.forceTermination();
+    try {
+      onError(terminationException);
+    } catch (RuntimeException e) {
+      // If onError or onComplete was previously called, this will throw.

Review Comment:
   can we keep track if onError or onComplete was called with a boolean instead 
of triggering runtimeexception? we had issues before where grpc would get stuck 
isntead of throwign exception if we misuse api
   
   then for onError/onCompleted you can verify it is not set and set it.  And 
in terminate you can call onError only if it isn't set and remove this catch.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,61 +69,86 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
     int awaitPhase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
     while (true) {
       try {
         synchronized (lock) {
+          int currentPhase = isReadyNotifier.getPhase();
+          // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+          // are synchronized after terminating the phaser if we observe that 
the phaser is not
+          // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+          if (currentPhase < 0) return;
+
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
             outboundObserver.onNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
             outboundObserver.onNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          return;

Review Comment:
   throw new StreamObserverCancelledException(e);
   
   seems better to surface that we didn't send message just in case to avoid 
race where we were blocked on send, terminate cancels phaser, but onComplete 
happens to run before terminates onError. Though see below we can improve 
onComplete to check for that race.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +130,49 @@ public static GrpcGetDataStream create(
       int streamingRpcBatchLimit,
       boolean sendKeyedGetDataRequests,
       Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
-    GrpcGetDataStream getDataStream =
-        new GrpcGetDataStream(
-            backendWorkerToken,
-            startGetDataRpcFn,
-            backoff,
-            streamObserverFactory,
-            streamRegistry,
-            logEveryNStreamFailures,
-            getDataThrottleTimer,
-            jobHeader,
-            idGenerator,
-            streamingRpcBatchLimit,
-            sendKeyedGetDataRequests,
-            processHeartbeatResponses);
-    getDataStream.startStream();
-    return getDataStream;
+    return new GrpcGetDataStream(
+        backendWorkerToken,
+        startGetDataRpcFn,
+        backoff,
+        streamObserverFactory,
+        streamRegistry,
+        logEveryNStreamFailures,
+        getDataThrottleTimer,
+        jobHeader,
+        idGenerator,
+        streamingRpcBatchLimit,
+        sendKeyedGetDataRequests,
+        processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedBatch batch) {
+    return new WindmillStreamShutdownException(
+        "Stream was closed when attempting to send " + batch.requestsCount() + 
" requests.");
+  }
+
+  private static WindmillStreamShutdownException 
shutdownExceptionFor(QueuedRequest request) {
+    return new WindmillStreamShutdownException(
+        "Cannot send request=[" + request + "] on closed stream.");
+  }
+
+  private void sendIgnoringClosed(StreamingGetDataRequest getDataRequest)
+      throws WindmillStreamShutdownException {
+    try {
+      send(getDataRequest);
+    } catch (StreamClosedException e) {
+      // Stream was closed on send, will be retried on stream restart.
+    }
   }
 
   @Override
-  protected synchronized void onNewStream() {
+  protected synchronized void onNewStream()
+      throws StreamClosedException, WindmillStreamShutdownException {
     send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build());
-    if (clientClosed.get()) {
+    if (clientClosed && !hasReceivedShutdownSignal()) {

Review Comment:
   remove? above send will fail if shutdown
   though I think we can also guarantee at higher level that onNewStream isn't 
called if shutdown



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,24 +417,35 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
-      if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+      if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+          || hasReceivedShutdownSignal()) {

Review Comment:
   can we remove the shutdown check here? seems easier to just handle it later. 
 Otherwise if we return false here, caller thinks batcher is full and  new 
batchers might be created and repeat.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -301,39 +343,59 @@ public void appendSpecificHtml(PrintWriter writer) {
     writer.append("]");
   }
 
-  private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+  private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn)
+      throws WindmillStreamShutdownException {
+    while (!hasReceivedShutdownSignal()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException | 
CancellationException e) {
+        handleShutdown(request, e);
+        if (!(e instanceof CancellationException)) {
+          throw e;
+        }
       } catch (IOException e) {
         LOG.error("Parsing GetData response failed: ", e);
-        continue;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
+        handleShutdown(request, e);
         throw new RuntimeException(e);
       } finally {
         pending.remove(request.id());
       }
     }
+
+    throw new WindmillStreamShutdownException(

Review Comment:
   use shutdownForRequest?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+    return isShutdown;
   }
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+      throws StreamClosedException, WindmillStreamShutdownException {
+    debugMetrics.recordSend();
+    requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
     synchronized (this) {
-      if (streamClosed.get()) {
-        throw new IllegalStateException("Send called on a client closed 
stream.");
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
       }
+    }
 
-      requestObserver().onNext(request);
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          debugMetrics.recordStart();
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown while creating new stream.", e);
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          debugMetrics.recordSleep(sleep);
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during {} creation backoff. The stream will not be 
created.",
+              getClass());
+          break;
+        } catch (IOException ioe) {
           // Keep trying to create the stream.
         }
       }
     }
+
+    // We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+    // removed when closed.
+    streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-    return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+    try {
+      executor.execute(runnable);
+    } catch (RejectedExecutionException e) {
+      logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+    }
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-    if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+    if (!clientClosed && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
       try {
         sendHealthCheck();
-      } catch (RuntimeException e) {
-        LOG.debug("Received exception sending health check.", e);
+      } catch (Exception e) {
+        logger.debug("Received exception sending health check.", e);
       }
     }
   }
 
-  protected abstract void sendHealthCheck();
+  protected abstract void sendHealthCheck()
+      throws WindmillStreamShutdownException, StreamClosedException;
 
-  // Care is taken that synchronization on this is unnecessary for all status 
page information.
-  // Blocking sends are made beneath this stream object's lock which could 
block status page
-  // rendering.
+  /**
+   * @implNote Care is taken that synchronization on this is unnecessary for 
all status page
+   *     information. Blocking sends are made beneath this stream object's 
lock which could block
+   *     status page rendering.
+   */
   public final void appendSummaryHtml(PrintWriter writer) {
     appendSpecificHtml(writer);
-    if (errorCount.get() > 0) {
-      writer.format(
-          ", %d errors, last error [ %s ] at [%s]",
-          errorCount.get(), lastError.get(), lastErrorTime.get());
-    }
-    if (clientClosed.get()) {
+    StreamDebugMetrics.Snapshot summaryMetrics = 
debugMetrics.getSummaryMetrics();
+    summaryMetrics
+        .restartMetrics()
+        .ifPresent(
+            metrics ->
+                writer.format(
+                    ", %d restarts, last restart reason [ %s ] at [%s], %d 
errors",
+                    metrics.restartCount(),
+                    metrics.lastRestartReason(),
+                    metrics.lastRestartTime(),
+                    metrics.errorCount()));
+
+    if (clientClosed) {

Review Comment:
   will need copy in debug metrics if removing volatile



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+    return isShutdown;
   }
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+      throws StreamClosedException, WindmillStreamShutdownException {
+    debugMetrics.recordSend();
+    requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
     synchronized (this) {
-      if (streamClosed.get()) {
-        throw new IllegalStateException("Send called on a client closed 
stream.");
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
       }
+    }
 
-      requestObserver().onNext(request);
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          debugMetrics.recordStart();
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown while creating new stream.", e);
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          debugMetrics.recordSleep(sleep);
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during {} creation backoff. The stream will not be 
created.",

Review Comment:
   when does this interruption happen? is it expected at all?
   
   
   I'm worried abou tnot starting the stream since any pending stuff is just 
going to never finish.  Should we shutdown the stream at least so that senders 
get errors? Or if we can remove this special case if we dont' think interrupt 
should occur that could be simpler too.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +147,150 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
+  protected synchronized boolean hasReceivedShutdownSignal() {
+    return isShutdown;
   }
 
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
+  /** Send a request to the server. */
+  protected final synchronized void send(RequestT request)
+      throws StreamClosedException, WindmillStreamShutdownException {
+    debugMetrics.recordSend();
+    requestObserver.onNext(request);
   }
 
-  /** Send a request to the server. */
-  protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
     synchronized (this) {
-      if (streamClosed.get()) {
-        throw new IllegalStateException("Send called on a client closed 
stream.");
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
       }
+    }
 
-      requestObserver().onNext(request);
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          debugMetrics.recordStart();
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown while creating new stream.", e);
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          debugMetrics.recordSleep(sleep);
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during {} creation backoff. The stream will not be 
created.",
+              getClass());
+          break;
+        } catch (IOException ioe) {
           // Keep trying to create the stream.
         }
       }
     }
+
+    // We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+    // removed when closed.
+    streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-    return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+    try {
+      executor.execute(runnable);
+    } catch (RejectedExecutionException e) {
+      logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+    }
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-    if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {

Review Comment:
   leave synchronized? (assuming we change clientClosed above)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamShutdownException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+/** Thrown when operations are requested on a {@link WindmillStream} has been 
shutdown/closed. */
+public final class WindmillStreamShutdownException extends Exception {

Review Comment:
   it's not particularly clear what the difference between 
StreamClosedException and WIndmillStreamShutdownException are from the comments.
   
   I was wondering too if we need different types of exceptions to distinguish 
between shutdown and stream being closed when handling them.  IF not we could 
just have one class with different messages.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    CommitCompletionException failures = new CommitCompletionException();
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      CommitStatus commitStatus =
+          i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
+      if (pendingRequest == null) {
+        if (!hasReceivedShutdownSignal()) {
+          // Skip responses when the stream is shutdown since they are now 
invalid.

Review Comment:
   nit: skip responses is unclear, maybe somethign like 
    // Missing responses is expected after shutdown because it removes them.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -139,4 +166,29 @@ public void onCompleted() {
       outboundObserver.onCompleted();

Review Comment:
   should we check if phaser has been terminated beneath the lock here and if 
so call onError?  that seems like it would enforce that if send didn't send 
something due to phaser cancellation that we wouldn't complete successfully.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +163,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    CommitCompletionException failures = new CommitCompletionException();

Review Comment:
   nit: how about a builder for this since we won't want to add exceptions in 
other places.
   
   then the final line of method can be
   
   builder.throwIfNonEmpty();
   and interrnally it builds exception and throws if needed?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,24 +417,35 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
-      if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+      if (!canAccept(commitRequest.getSerializedSize() + computation.length())
+          || hasReceivedShutdownSignal()) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!hasReceivedShutdownSignal()) {

Review Comment:
   can we remove? it is handled in stream methods already and doing it here 
leaves a gap between shutdown check and flush anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to