scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1819065286


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -278,21 +357,74 @@ public String backendWorkerToken() {
   }
 
   @Override
-  public void shutdown() {
-    if (isShutdown.compareAndSet(false, true)) {
-      requestObserver()
-          .onError(new WindmillStreamShutdownException("Explicit call to 
shutdown stream."));
+  public final void shutdown() {
+    // Don't lock on "this" as isShutdown checks are used in the stream to 
free blocked
+    // threads or as exit conditions to loops.
+    synchronized (shutdownLock) {
+      if (!isShutdown) {
+        isShutdown = true;
+        shutdownTime.set(DateTime.now());
+        if (started) {
+          // requestObserver is not set until the first startStream() is 
called. If the stream was
+          // never started there is nothing to clean up internally.
+          requestObserver.onError(

Review Comment:
   this seems racy since start sets started to true before startStream calls 
requestObserver.reset() initially, or it could happen that startStream resets 
the observer after we call onError here and the new one doesn't have an error 
called.
   
   how about adding a RequestObserver.poison which permanently puts the request 
observer in a poisoned state, calling onError on the current one or any future 
made ones? you can remove the started check and the requestobserver handles the 
synchronization within itself.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +53,69 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+
   protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,
+   *     should acquire shutdownLock first to prevent deadlocks.
+   */
+  protected final Object shutdownLock = new Object();
+
   private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  private final ExecutorService executor;
   private final BackOff backoff;
   private final AtomicLong startTimeMs;
   private final AtomicLong lastResponseTimeMs;
+  private final AtomicInteger restartCount;
   private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
+  private final AtomicReference<String> lastRestartReason;
+  private final AtomicReference<DateTime> lastRestartTime;
   private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableRequestObserver<RequestT> requestObserver;
+  private final AtomicReference<DateTime> shutdownTime;
+
+  /**
+   * Indicates if the current {@link ResettableRequestObserver} was closed by 
calling {@link
+   * #halfClose()}.

Review Comment:
   Would be good to compare to clarify versus clientClosed.
   This is more about the specific physical stream
   
   Maybe add "Separate from clientClosed as this is specific to the 
requestObserver and is initially false on retry."



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +53,69 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+
   protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,
+   *     should acquire shutdownLock first to prevent deadlocks.
+   */
+  protected final Object shutdownLock = new Object();
+
   private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  private final ExecutorService executor;
   private final BackOff backoff;
   private final AtomicLong startTimeMs;
   private final AtomicLong lastResponseTimeMs;
+  private final AtomicInteger restartCount;
   private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
+  private final AtomicReference<String> lastRestartReason;
+  private final AtomicReference<DateTime> lastRestartTime;

Review Comment:
   this is getting to be a lot of atomics. Most are just atomic for status page.
   
   I think it could be less confusing to have some MonitoringInfo object which 
has synchronized methods like noteRestart(...), sleepUntil(sleeper, time), 
noteSend, noteResponse to make it clearer what is just monitoring stuff and 
what is part of the state machine.
   
   For state machine things like isShutdown, clientClosed it woudl be nice if 
we could move away from atomics to variables marked with GuardedBy. In some 
cases that might lead to duplication with the monitoring state but I think it 
could still help with analyzing the locking correctness or races within this 
class. For example for shutdown, I think we could have boolean guardedby 
shutdown mutex, and a method on MonitoringState like noteShutdown() which would 
internally have it's own boolean and time the shutdown occurred.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to