scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1822676899


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ *     {@link ThreadSafe}.
+ */
+@ThreadSafe
+@Internal
+final class ResettableStreamObserver<T> implements StreamObserver<T> {
+  private final Supplier<StreamObserver<T>> streamObserverFactory;
+
+  @GuardedBy("this")
+  private @Nullable StreamObserver<T> delegateStreamObserver;
+
+  /**
+   * Indicates that the request observer should no longer be used. Attempts to 
perform operations on

Review Comment:
   move comment next to poison method?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+
+/**
+ * Request observer that allows resetting its internal delegate using the 
given {@link
+ * #streamObserverFactory}.
+ *
+ * @implNote {@link StreamObserver}s generated by {@link 
#streamObserverFactory} are expected to be
+ *     {@link ThreadSafe}.
+ */
+@ThreadSafe
+@Internal
+final class ResettableStreamObserver<T> implements StreamObserver<T> {
+  private final Supplier<StreamObserver<T>> streamObserverFactory;
+
+  @GuardedBy("this")
+  private @Nullable StreamObserver<T> delegateStreamObserver;
+
+  /**
+   * Indicates that the request observer should no longer be used. Attempts to 
perform operations on
+   * the request observer will throw an {@link 
WindmillStreamShutdownException}.
+   */
+  @GuardedBy("this")
+  private boolean isPoisoned;
+
+  ResettableStreamObserver(Supplier<StreamObserver<T>> streamObserverFactory) {
+    this.streamObserverFactory = streamObserverFactory;
+    this.delegateStreamObserver = null;
+    this.isPoisoned = false;
+  }
+
+  private synchronized StreamObserver<T> delegate() {
+    if (isPoisoned) {
+      throw new WindmillStreamShutdownException("Explicit call to shutdown 
stream.");
+    }
+
+    return Preconditions.checkNotNull(
+        delegateStreamObserver,
+        "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+  }
+
+  synchronized void reset() {

Review Comment:
   add comment
   "Creates a new delegate to use for future StreamObserver methods."



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -204,56 +224,64 @@ private void flushInternal(Map<Long, PendingRequest> 
requests) {
     }
   }
 
-  private void issueSingleRequest(final long id, PendingRequest 
pendingRequest) {
+  private void issueSingleRequest(long id, PendingRequest pendingRequest) {
+    if (!prepareForSend(id, pendingRequest)) {
+      pendingRequest.abort();
+      return;
+    }
+
     StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
     requestBuilder
         .addCommitChunkBuilder()
-        .setComputationId(pendingRequest.computation)
+        .setComputationId(pendingRequest.computationId())
         .setRequestId(id)
-        .setShardingKey(pendingRequest.request.getShardingKey())
-        .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+        .setShardingKey(pendingRequest.shardingKey())
+        .setSerializedWorkItemCommit(pendingRequest.serializedCommit());
     StreamingCommitWorkRequest chunk = requestBuilder.build();
-    synchronized (this) {
-      pending.put(id, pendingRequest);
-      try {
-        send(chunk);
-      } catch (IllegalStateException e) {
-        // Stream was broken, request will be retried when stream is reopened.
-      }
+    try {
+      send(chunk);
+    } catch (IllegalStateException e) {
+      // Stream was broken, request will be retried when stream is reopened.
     }
   }
 
   private void issueBatchedRequest(Map<Long, PendingRequest> requests) {
+    if (!prepareForSend(requests)) {
+      requests.forEach((ignored, pendingRequest) -> pendingRequest.abort());
+      return;
+    }
+
     StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
     String lastComputation = null;
     for (Map.Entry<Long, PendingRequest> entry : requests.entrySet()) {
       PendingRequest request = entry.getValue();
       StreamingCommitRequestChunk.Builder chunkBuilder = 
requestBuilder.addCommitChunkBuilder();
-      if (lastComputation == null || 
!lastComputation.equals(request.computation)) {
-        chunkBuilder.setComputationId(request.computation);
-        lastComputation = request.computation;
+      if (lastComputation == null || 
!lastComputation.equals(request.computationId())) {
+        chunkBuilder.setComputationId(request.computationId());
+        lastComputation = request.computationId();
       }
-      chunkBuilder.setRequestId(entry.getKey());
-      chunkBuilder.setShardingKey(request.request.getShardingKey());
-      chunkBuilder.setSerializedWorkItemCommit(request.request.toByteString());
+      chunkBuilder
+          .setRequestId(entry.getKey())
+          .setShardingKey(request.shardingKey())
+          .setSerializedWorkItemCommit(request.serializedCommit());
     }
     StreamingCommitWorkRequest request = requestBuilder.build();
-    synchronized (this) {
-      pending.putAll(requests);
-      try {
-        send(request);
-      } catch (IllegalStateException e) {
-        // Stream was broken, request will be retried when stream is reopened.
-      }
+    try {
+      send(request);
+    } catch (IllegalStateException e) {
+      // Stream was broken, request will be retried when stream is reopened.
     }
   }
 
-  private void issueMultiChunkRequest(final long id, PendingRequest 
pendingRequest) {
-    checkNotNull(pendingRequest.computation);
-    final ByteString serializedCommit = pendingRequest.request.toByteString();
+  private void issueMultiChunkRequest(long id, PendingRequest pendingRequest) {
+    if (!prepareForSend(id, pendingRequest)) {
+      pendingRequest.abort();
+      return;
+    }
 
+    checkNotNull(pendingRequest.computationId(), "Cannot commit WorkItem w/o a 
computationId.");
+    ByteString serializedCommit = pendingRequest.serializedCommit();
     synchronized (this) {
-      pending.put(id, pendingRequest);
       for (int i = 0;

Review Comment:
   moving the adding to pending requests out of this synchronized block means 
that it is possible for the stream to be recreated and the items already 
removed from pendingRequests.  Then we will end up sending the requests 
multiple  times on the new stream and we may get multiple responses 
unexpectedly.
   
   Can prepareforsend be called beneath the synchronized block to minimize 
locking changes?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long 
startMs) {
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
   protected boolean isShutdown() {

Review Comment:
   name the method and variable differently.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,26 +418,43 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
+      if (isShutdown()) {
+        onDone.accept(CommitStatus.ABORTED);

Review Comment:
   if we return false I don't think the caller expects onDone to be called.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import com.google.auto.value.AutoValue;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Records stream metrics for debugging. */
+@ThreadSafe
+final class StreamDebugMetrics {
+  private final AtomicInteger restartCount = new AtomicInteger();

Review Comment:
   can these just be synchronized ints as well instead of atomic?  WE're not 
doing anything expensive under lock so it doesn't seem like it needs separate 
consideration.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/StreamDebugMetrics.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import com.google.auto.value.AutoValue;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+
+/** Records stream metrics for debugging. */
+@ThreadSafe
+final class StreamDebugMetrics {
+  private final AtomicInteger restartCount = new AtomicInteger();
+  private final AtomicInteger errorCount = new AtomicInteger();
+
+  @GuardedBy("this")
+  private long sleepUntil = 0;
+
+  @GuardedBy("this")
+  private String lastRestartReason = "";
+
+  @GuardedBy("this")
+  private DateTime lastRestartTime = null;
+
+  @GuardedBy("this")
+  private long lastResponseTimeMs = 0;
+
+  @GuardedBy("this")
+  private long lastSendTimeMs = 0;
+
+  @GuardedBy("this")
+  private long startTimeMs = 0;
+
+  @GuardedBy("this")
+  private DateTime shutdownTime = null;
+
+  private static long debugDuration(long nowMs, long startMs) {
+    return startMs <= 0 ? -1 : Math.max(0, nowMs - startMs);
+  }
+
+  private static long nowMs() {
+    return Instant.now().getMillis();
+  }
+
+  synchronized void recordSend() {
+    lastSendTimeMs = nowMs();
+  }
+
+  synchronized void recordStart() {
+    startTimeMs = nowMs();
+    lastResponseTimeMs = 0;
+  }
+
+  synchronized void recordResponse() {
+    lastResponseTimeMs = nowMs();
+  }
+
+  synchronized void recordRestartReason(String error) {
+    lastRestartReason = error;
+    lastRestartTime = DateTime.now();
+  }
+
+  synchronized long startTimeMs() {

Review Comment:
   getStartTimeMs?
   
   ditto for lastSendTimeMs



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -314,26 +418,43 @@ private Batcher() {
     @Override
     public boolean commitWorkItem(
         String computation, WorkItemCommitRequest commitRequest, 
Consumer<CommitStatus> onDone) {
+      if (isShutdown()) {
+        onDone.accept(CommitStatus.ABORTED);
+        return false;
+      }
+
       if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!isShutdown()) {
+          flushInternal(queue);
+        } else {
+          queue.forEach((ignored, request) -> request.abort());
+        }
+      } finally {
+        queuedBytes = 0;
+        queue.clear();
+      }
     }
 
     void add(long id, PendingRequest request) {
-      assert (canAccept(request.getBytes()));
-      queuedBytes += request.getBytes();
-      queue.put(id, request);
+      if (isShutdown()) {

Review Comment:
   seems like this should just be in commitWorkItem or here not both



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,64 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,
+   *     should acquire shutdownLock first to prevent deadlocks.
+   */
+  protected final Object shutdownLock = new Object();
+
+  private final Logger logger;
+  private final ExecutorService executor;
   private final BackOff backoff;
-  private final AtomicLong startTimeMs;
-  private final AtomicLong lastResponseTimeMs;
-  private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
-  private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableStreamObserver<RequestT> requestObserver;
+  private final StreamDebugMetrics debugMetrics;
+  protected volatile boolean clientClosed;
+
+  /**
+   * Indicates if the current {@link ResettableStreamObserver} was closed by 
calling {@link
+   * #halfClose()}. Separate from {@link #clientClosed} as this is specific to 
the requestObserver
+   * and is initially false on retry.
+   */
+  @GuardedBy("this")
+  private boolean streamClosed;
+
+  private volatile boolean isShutdown;
+  private volatile boolean started;

Review Comment:
   can this be guarded by one of hte locks instead of volatile?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,64 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,
+   *     should acquire shutdownLock first to prevent deadlocks.
+   */
+  protected final Object shutdownLock = new Object();
+
+  private final Logger logger;
+  private final ExecutorService executor;
   private final BackOff backoff;
-  private final AtomicLong startTimeMs;
-  private final AtomicLong lastResponseTimeMs;
-  private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
-  private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableStreamObserver<RequestT> requestObserver;
+  private final StreamDebugMetrics debugMetrics;
+  protected volatile boolean clientClosed;
+
+  /**
+   * Indicates if the current {@link ResettableStreamObserver} was closed by 
calling {@link
+   * #halfClose()}. Separate from {@link #clientClosed} as this is specific to 
the requestObserver
+   * and is initially false on retry.
+   */
+  @GuardedBy("this")
+  private boolean streamClosed;
+
+  private volatile boolean isShutdown;

Review Comment:
   can this be guarded by shutdown lock? the accessor method can synchronize



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -49,46 +44,64 @@
  * stream if it is broken. Subclasses are responsible for retrying requests 
that have been lost on a
  * broken stream.
  *
- * <p>Subclasses should override onResponse to handle responses from the 
server, and onNewStream to
- * perform any work that must be done when a new stream is created, such as 
sending headers or
- * retrying requests.
+ * <p>Subclasses should override {@link #onResponse(ResponseT)} to handle 
responses from the server,
+ * and {@link #onNewStream()} to perform any work that must be done when a new 
stream is created,
+ * such as sending headers or retrying requests.
  *
- * <p>send and startStream should not be called from onResponse; use 
executor() instead.
+ * <p>{@link #send(RequestT)} and {@link #startStream()} should not be called 
from {@link
+ * #onResponse(ResponseT)}; use {@link #executeSafely(Runnable)} instead.
  *
  * <p>Synchronization on this is used to synchronize the gRpc stream state and 
internal data
  * structures. Since grpc channel operations may block, synchronization on 
this stream may also
  * block. This is generally not a problem since streams are used in a 
single-threaded manner.
  * However, some accessors used for status page and other debugging need to 
take care not to require
  * synchronizing on this.
+ *
+ * <p>{@link #start()} and {@link #shutdown()} are called once in the lifetime 
of the stream. Once
+ * {@link #shutdown()}, a stream in considered invalid and cannot be 
restarted/reused.
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
-  protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
-  private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  // Indicates that the logical stream has been half-closed and is waiting for 
clean server
+  // shutdown.
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+  protected final Sleeper sleeper;
+
+  /**
+   * Used to guard {@link #start()} and {@link #shutdown()} behavior.
+   *
+   * @implNote Do not hold when performing IO. If also locking on {@code this} 
in the same context,

Review Comment:
   if you acquire this before (this) lock, then this mutex could be blocked by 
I/O because we perform IO beneath this lock.
   
   If this is supposed to be lightweight it seems like it should be acquired 
after (this) to avoid that.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -154,112 +159,169 @@ private static long debugDuration(long nowMs, long 
startMs) {
 
   /** Reflects that {@link #shutdown()} was explicitly called. */
   protected boolean isShutdown() {
-    return isShutdown.get();
-  }
-
-  private StreamObserver<RequestT> requestObserver() {
-    if (requestObserver == null) {
-      throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
-    }
-
-    return requestObserver;
+    return isShutdown;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
-      if (streamClosed.get()) {
+      if (isShutdown) {
+        return;
+      }
+
+      if (streamClosed) {
+        // TODO(m-trieu): throw a more specific exception here (i.e 
StreamClosedException)
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        debugMetrics.recordSend();
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown) {
+          logger.debug("Stream was shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    boolean shouldStartStream = false;
+    synchronized (shutdownLock) {
+      if (!isShutdown && !started) {
+        started = true;
+        shouldStartStream = true;
+      }
+    }
+
+    if (shouldStartStream) {
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
-          startTimeMs.set(Instant.now().getMillis());
-          lastResponseTimeMs.set(0);
-          streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          if (isShutdown) {
+            break;
+          }
+          debugMetrics.recordStart();
+          streamClosed = false;
+          requestObserver.reset();
           onNewStream();
-          if (clientClosed.get()) {
+          if (clientClosed) {
             halfClose();
           }
           return;
         }
+      } catch (WindmillStreamShutdownException e) {
+        logger.debug("Stream was shutdown waiting to start.", e);
       } catch (Exception e) {
-        LOG.error("Failed to create new stream, retrying: ", e);
+        logger.error("Failed to create new stream, retrying: ", e);
         try {
           long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException | IOException i) {
+          debugMetrics.recordSleep(sleep);
+          sleeper.sleep(sleep);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+          logger.info(
+              "Interrupted during {} creation backoff. The stream will not be 
created.",
+              getClass());
+          break;
+        } catch (IOException ioe) {
           // Keep trying to create the stream.
         }
       }
     }
+
+    // We were never able to start the stream, remove it from the stream 
registry. Otherwise, it is
+    // removed when closed.
+    streamRegistry.remove(this);
   }
 
-  protected final Executor executor() {
-    return executor;
+  /**
+   * Execute the runnable using the {@link #executor} handling the executor 
being in a shutdown
+   * state.
+   */
+  protected final void executeSafely(Runnable runnable) {
+    try {
+      executor.execute(runnable);
+    } catch (RejectedExecutionException e) {
+      logger.debug("{}-{} has been shutdown.", getClass(), backendWorkerToken);
+    }
   }
 
-  public final synchronized void maybeSendHealthCheck(Instant 
lastSendThreshold) {
-    if (lastSendTimeMs.get() < lastSendThreshold.getMillis() && 
!clientClosed.get()) {
+  public final void maybeSendHealthCheck(Instant lastSendThreshold) {
+    if (!clientClosed && debugMetrics.lastSendTimeMs() < 
lastSendThreshold.getMillis()) {
       try {
         sendHealthCheck();
       } catch (RuntimeException e) {
-        LOG.debug("Received exception sending health check.", e);
+        logger.debug("Received exception sending health check.", e);
       }
     }
   }
 
   protected abstract void sendHealthCheck();
 
-  // Care is taken that synchronization on this is unnecessary for all status 
page information.
-  // Blocking sends are made beneath this stream object's lock which could 
block status page
-  // rendering.
+  /**
+   * @implNote Care is taken that synchronization on this is unnecessary for 
all status page
+   *     information. Blocking sends are made beneath this stream object's 
lock which could block
+   *     status page rendering.
+   */
+  @SuppressWarnings("GuardedBy")

Review Comment:
   why does this need to be suppressed? seems better to fix it
   
   it can cause TSAN failures even if it is benign from VM memory model.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to