scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1829093210


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {
+        return;
+      }
+
+      if (streamClosed) {
+        // TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");

Review Comment:
   I would remove this since it seems likely expensive.
   Instead you could verify with a test:
   - setup requestObserver that blocks until notified
   - one thread calls send and starts blocking
   - main test thread calls shutdown() and verifies the method returns
   - main test thread unblocks the requestObserver
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {
+        return;
+      }
+
+      if (streamClosed) {
+        // TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+        debugMetrics.recordSend();
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (hasReceivedShutdownSignal()) {
+          logger.debug("Stream was shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
+    synchronized (shutdownLock) {
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
+      }
+    }
+
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          if (hasReceivedShutdownSignal()) {

Review Comment:
   this seems racy since shutdown coudl happen right after, the requestObserver 
poisoning already handles it below.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {
+        return;
+      }
+
+      if (streamClosed) {

Review Comment:
   can streamClosed be moved into the ResettableRequestObserver? Then you 
wouldn't have to keep trying to keep it in sync here when we reset() and it 
could just be beneath that synchronization.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {
+        return;
+      }
+
+      if (streamClosed) {
+        // TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+        debugMetrics.recordSend();
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (hasReceivedShutdownSignal()) {
+          logger.debug("Stream was shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
+    synchronized (shutdownLock) {
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
+      }
+    }
+
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          if (hasReceivedShutdownSignal()) {
+            break;
+          }
+          debugMetrics.recordStart();
+          streamClosed = false;
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown waiting to start.", e);
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          debugMetrics.recordSleep(sleep);
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during {} creation backoff. The stream will not be 
created.",
+              getClass());
+          break;
+        } catch (IOException ioe) {
           // Keep trying to create the stream.
         }
       }
     }
+
+    // We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+    // removed when closed.
+    streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-    return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+    try {
+      executor.execute(runnable);
+    } catch (RejectedExecutionException e) {
+      logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+    }
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-    if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+    if (!clientClosed && debugMetrics.getLastSendTimeMs() < 
lastSendThreshold.getMillis()) {
       try {
         sendHealthCheck();
       } catch (RuntimeException e) {
-        LOG.debug("Received exception sending health check.", e);
+        logger.debug("Received exception sending health check.", e);
       }
     }
   }
 
   protected abstract void sendHealthCheck();
 
-  // Care is taken that synchronization on this is unnecessary for all status 
page information.
-  // Blocking sends are made beneath this stream object's lock which could 
block status page
-  // rendering.
+  /**
+   * @implNote Care is taken that synchronization on this is unnecessary for 
all status page
+   *     information. Blocking sends are made beneath this stream object's 
lock which could block
+   *     status page rendering.
+   */
   public final void appendSummaryHtml(PrintWriter writer) {
     appendSpecificHtml(writer);
-    if (errorCount.get() > 0) {
-      writer.format(
-          ", %d errors, last error [ %s ] at [%s]",
-          errorCount.get(), lastError.get(), lastErrorTime.get());
-    }
-    if (clientClosed.get()) {
+    StreamDebugMetrics.Snapshot summaryMetrics = 
debugMetrics.getSummaryMetrics();
+    summaryMetrics
+        .restartMetrics()
+        .ifPresent(
+            metrics ->
+                writer.format(
+                    ", %d restarts, last restart reason [ %s ] at [%s], %d 
errors",
+                    metrics.restartCount(),
+                    metrics.lastRestartReason(),
+                    metrics.lastRestartTime(),
+                    metrics.errorCount()));
+
+    if (clientClosed) {
       writer.write(", client closed");
     }
-    long nowMs = Instant.now().getMillis();
-    long sleepLeft = sleepUntil.get() - nowMs;
-    if (sleepLeft > 0) {
-      writer.format(", %dms backoff remaining", sleepLeft);
+
+    if (summaryMetrics.sleepLeft() > 0) {
+      writer.format(", %dms backoff remaining", summaryMetrics.sleepLeft());
     }
+
     writer.format(
-        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s",
-        debugDuration(nowMs, startTimeMs.get()),
-        debugDuration(nowMs, lastSendTimeMs.get()),
-        debugDuration(nowMs, lastResponseTimeMs.get()),
-        streamClosed.get());
+        ", current stream is %dms old, last send %dms, last response %dms, 
closed: %s, "
+            + "isShutdown: %s, shutdown time: %s",
+        summaryMetrics.streamAge(),
+        summaryMetrics.timeSinceLastSend(),
+        summaryMetrics.timeSinceLastResponse(),
+        streamClosed,
+        hasReceivedShutdownSignal(),
+        summaryMetrics.shutdownTime().orElse(null));
   }
 
-  // Don't require synchronization on stream, see the appendSummaryHtml 
comment.
+  /**
+   * @implNote Don't require synchronization on stream, see the {@link
+   *     #appendSummaryHtml(PrintWriter)} comment.
+   */
   protected abstract void appendSpecificHtml(PrintWriter writer);
 
   @Override
   public final synchronized void halfClose() {
     // Synchronization of close and onCompleted necessary for correct retry 
logic in onNewStream.
-    clientClosed.set(true);
-    requestObserver().onCompleted();
-    streamClosed.set(true);
+    clientClosed = true;
+    requestObserver.onCompleted();

Review Comment:
   catch the windmill shutdown exception?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
       int streamingRpcBatchLimit,
       boolean sendKeyedGetDataRequests,
       Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
-    GrpcGetDataStream getDataStream =
-        new GrpcGetDataStream(
-            backendWorkerToken,
-            startGetDataRpcFn,
-            backoff,
-            streamObserverFactory,
-            streamRegistry,
-            logEveryNStreamFailures,
-            getDataThrottleTimer,
-            jobHeader,
-            idGenerator,
-            streamingRpcBatchLimit,
-            sendKeyedGetDataRequests,
-            processHeartbeatResponses);
-    getDataStream.startStream();
-    return getDataStream;
+    return new GrpcGetDataStream(
+        backendWorkerToken,
+        startGetDataRpcFn,
+        backoff,
+        streamObserverFactory,
+        streamRegistry,
+        logEveryNStreamFailures,
+        getDataThrottleTimer,
+        jobHeader,
+        idGenerator,
+        streamingRpcBatchLimit,
+        sendKeyedGetDataRequests,
+        processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException shutdownException(QueuedBatch 
batch) {
+    return new WindmillStreamShutdownException(
+        "Stream was closed when attempting to send " + batch.requestsCount() + 
" requests.");
+  }
+
+  private static WindmillStreamShutdownException 
shutdownException(QueuedRequest request) {
+    return new WindmillStreamShutdownException(
+        "Cannot send request=[" + request + "] on closed stream.");
   }
 
   @Override
   protected synchronized void onNewStream() {
+    if (hasReceivedShutdownSignal()) {

Review Comment:
   similar comments, this check for shutdown seems racy here. Seems better to 
just handle the exception due to poisoning as that handles all cases



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,62 +391,88 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
       batch.addRequest(request);
     }
     if (responsibleForSend) {
-      if (waitForSendLatch == null) {
+      if (prevBatch == null) {
         // If there was not a previous batch wait a little while to improve
         // batching.
-        Thread.sleep(1);
+        sleeper.sleep(1);
       } else {
-        waitForSendLatch.await();
+        prevBatch.waitForSendOrFailNotification();
       }
       // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
       // queue so that a subsequent batch will wait for its completion.
-      synchronized (batches) {
-        verify(batch == batches.peekFirst());
+      synchronized (shutdownLock) {
+        if (hasReceivedShutdownSignal()) {
+          throw shutdownException(batch);
+        }
+
+        verify(batch == batches.peekFirst(), "GetDataStream request batch 
removed before send().");
         batch.markFinalized();
       }
-      sendBatch(batch.requests());
-      synchronized (batches) {
-        verify(batch == batches.pollFirst());
+      trySendBatch(batch);
+    } else {
+      // Wait for this batch to be sent before parsing the response.
+      batch.waitForSendOrFailNotification();
+    }
+  }
+
+  void trySendBatch(QueuedBatch batch) {
+    try {
+      sendBatch(batch);
+      synchronized (shutdownLock) {
+        if (hasReceivedShutdownSignal()) {
+          throw shutdownException(batch);
+        }
+
+        verify(
+            batch == batches.pollFirst(),
+            "Sent GetDataStream request batch removed before send() was 
complete.");
       }
       // Notify all waiters with requests in this batch as well as the sender
       // of the next batch (if one exists).
-      batch.countDown();
-    } else {
-      // Wait for this batch to be sent before parsing the response.
-      batch.await();
+      batch.notifySent();
+    } catch (Exception e) {
+      // Free waiters if the send() failed.
+      batch.notifyFailed();
+      // Propagate the exception to the calling thread.
+      throw e;
     }
   }
 
-  @SuppressWarnings("NullableProblems")
-  private void sendBatch(List<QueuedRequest> requests) {
-    StreamingGetDataRequest batchedRequest = flushToBatch(requests);
+  private void sendBatch(QueuedBatch batch) {
+    if (batch.isEmpty()) {
+      return;
+    }
+
+    // Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
+    // sent on stream reconnect.
     synchronized (this) {
-      // Synchronization of pending inserts is necessary with send to ensure 
duplicates are not
-      // sent on stream reconnect.
-      for (QueuedRequest request : requests) {
+      synchronized (shutdownLock) {
+        // shutdown() clears pending, once the stream is shutdown, prevent 
values from being added

Review Comment:
   since you drop the lock here before putting in pending, shutdown could just 
run after you drop it and clear and then you add stuff



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ *     {@link ThreadSafe}.
+ */
+@ThreadSafe
+@Internal
+final class ResettableStreamObserver<T> implements StreamObserver<T> {
+  private final Supplier<StreamObserver<T>> streamObserverFactory;
+
+  @GuardedBy("this")
+  private @Nullable StreamObserver<T> delegateStreamObserver;
+
+  @GuardedBy("this")
+  private boolean isPoisoned;
+
+  ResettableStreamObserver(Supplier<StreamObserver<T>> streamObserverFactory) {
+    this.streamObserverFactory = streamObserverFactory;
+    this.delegateStreamObserver = null;
+    this.isPoisoned = false;
+  }
+
+  private synchronized StreamObserver<T> delegate() {
+    if (isPoisoned) {
+      throw new WindmillStreamShutdownException("Explicit call to shutdown 
stream.");
+    }
+
+    return Preconditions.checkNotNull(
+        delegateStreamObserver,
+        "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  }
+
+  /** Creates a new delegate to use for future {@link StreamObserver} methods. 
*/
+  synchronized void reset() {
+    if (isPoisoned) {
+      throw new WindmillStreamShutdownException("Explicit call to shutdown 
stream.");
+    }
+
+    delegateStreamObserver = streamObserverFactory.get();
+  }
+
+  /**
+   * Indicates that the request observer should no longer be used. Attempts to 
perform operations on
+   * the request observer will throw an {@link 
WindmillStreamShutdownException}.
+   */
+  synchronized void poison() {
+    if (!isPoisoned) {
+      isPoisoned = true;
+      if (delegateStreamObserver != null) {
+        delegateStreamObserver.onError(
+            new WindmillStreamShutdownException("Explicit call to shutdown 
stream."));
+      }

Review Comment:
   set delegateStreamObserver to null? might as well let it be gc'd



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {
+        return;
+      }
+
+      if (streamClosed) {
+        // TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be 
held during send.");
+        debugMetrics.recordSend();
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (hasReceivedShutdownSignal()) {
+          logger.debug("Stream was shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
+    synchronized (shutdownLock) {
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
+      }
+    }
+
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          if (hasReceivedShutdownSignal()) {
+            break;
+          }
+          debugMetrics.recordStart();
+          streamClosed = false;
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown waiting to start.", e);

Review Comment:
   "waiting to start" is a little confusing, maybe just "while creating new 
stream" ?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java:
##########
@@ -201,7 +198,7 @@ private void maybeSendRequestExtension(GetWorkBudget 
extension) {
   @Override
   protected synchronized void onNewStream() {
     workItemAssemblers.clear();
-    if (!isShutdown()) {
+    if (!hasReceivedShutdownSignal()) {

Review Comment:
   this is racy since shutdown coudl occur right after this check. can we 
instead ensure that we handle stream close exception on the send below? or that 
we handle it at the call site to onNewStream?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -121,32 +129,43 @@ public static GrpcGetDataStream create(
       int streamingRpcBatchLimit,
       boolean sendKeyedGetDataRequests,
       Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
-    GrpcGetDataStream getDataStream =
-        new GrpcGetDataStream(
-            backendWorkerToken,
-            startGetDataRpcFn,
-            backoff,
-            streamObserverFactory,
-            streamRegistry,
-            logEveryNStreamFailures,
-            getDataThrottleTimer,
-            jobHeader,
-            idGenerator,
-            streamingRpcBatchLimit,
-            sendKeyedGetDataRequests,
-            processHeartbeatResponses);
-    getDataStream.startStream();
-    return getDataStream;
+    return new GrpcGetDataStream(
+        backendWorkerToken,
+        startGetDataRpcFn,
+        backoff,
+        streamObserverFactory,
+        streamRegistry,
+        logEveryNStreamFailures,
+        getDataThrottleTimer,
+        jobHeader,
+        idGenerator,
+        streamingRpcBatchLimit,
+        sendKeyedGetDataRequests,
+        processHeartbeatResponses);
+  }
+
+  private static WindmillStreamShutdownException shutdownException(QueuedBatch 
batch) {

Review Comment:
   nit: how about shutdownExceptionForBatch and shutdownExceptionForRequest, or 
overload shutdownExceptionFor



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -153,113 +163,172 @@ private static long debugDuration(long nowMs, long 
startMs) {
   protected abstract void startThrottleTimer();
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
-  protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  protected boolean hasReceivedShutdownSignal() {
+    synchronized (shutdownLock) {
+      return isShutdown;
     }
-
-    return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (hasReceivedShutdownSignal()) {

Review Comment:
   this seems racy. Since you are dropping the shutdown mutex it could happen 
right afterwards. What about removing and just relying on the poisoning causing 
exception in onNext?
   
   It doesn't appear that the shutdown cancellation is being caught or handled 
yet.  Perhaps that should be a non-RuntimeException thrown by 
ResettableRequestObserver methods so that we handle it here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -49,22 +49,29 @@
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.StreamingGetDataResponse;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.AbstractWindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedBatch;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcGetDataStreamRequests.QueuedRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottleTimer;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Verify;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@ThreadSafe
 final class GrpcGetDataStream
     extends AbstractWindmillStream<StreamingGetDataRequest, 
StreamingGetDataResponse>
     implements GetDataStream {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcGetDataStream.class);
+  private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
+      StreamingGetDataRequest.newBuilder().build();
 
+  /** @implNote {@link QueuedBatch} objects in the queue are is guarded by 
{@link #shutdownLock} */
   private final Deque<QueuedBatch> batches;

Review Comment:
   can you do GuardedBy("AbstractWindmillStream.this.shutdownLock")?  See 
ClassName.this.fieldName from https://errorprone.info/bugpattern/GuardedBy



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to