arunpandianp commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1805562867


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+final class GlobalDataStreamSender implements Closeable, 
Supplier<GetDataStream> {
+  private final Endpoint endpoint;
+  private final GetDataStream delegate;
+  private volatile boolean started;
+
+  GlobalDataStreamSender(GetDataStream delegate, Endpoint endpoint) {
+    this.delegate = delegate;
+    this.started = false;
+    this.endpoint = endpoint;
+  }
+
+  @Override
+  public GetDataStream get() {
+    if (!started) {
+      startStream();

Review Comment:
   can you add a comment on why we are lazily starting GlobalDataStreams?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -114,50 +120,35 @@ private FanOutStreamingEngineWorkerHarness(
       ChannelCachingStubFactory channelCachingStubFactory,
       GetWorkBudgetDistributor getWorkBudgetDistributor,
       GrpcDispatcherClient dispatcherClient,
-      long clientId,
       Function<WindmillStream.CommitWorkStream, WorkCommitter> 
workCommitterFactory,
       ThrottlingGetDataMetricTracker getDataMetricTracker) {
     this.jobHeader = jobHeader;
     this.getDataMetricTracker = getDataMetricTracker;
     this.started = false;
     this.streamFactory = streamFactory;
     this.workItemScheduler = workItemScheduler;
-    this.connections = new 
AtomicReference<>(StreamingEngineConnectionState.EMPTY);
+    this.backends = new AtomicReference<>(StreamingEngineBackends.EMPTY);
     this.channelCachingStubFactory = channelCachingStubFactory;
     this.dispatcherClient = dispatcherClient;
-    this.isBudgetRefreshPaused = new AtomicBoolean(false);
     this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
-    this.newWorkerMetadataPublisher =
-        singleThreadedExecutorServiceOf(PUBLISH_NEW_WORKER_METADATA_THREAD);
-    this.newWorkerMetadataConsumer =
-        singleThreadedExecutorServiceOf(CONSUME_NEW_WORKER_METADATA_THREAD);
-    this.clientId = clientId;
-    this.lastBudgetRefresh = new AtomicReference<>(Instant.EPOCH);
-    this.newWindmillEndpoints = 
Queues.synchronizedQueue(EvictingQueue.create(1));
-    this.getWorkBudgetRefresher =
-        new GetWorkBudgetRefresher(
-            isBudgetRefreshPaused::get,
-            () -> {
-              getWorkBudgetDistributor.distributeBudget(
-                  connections.get().windmillStreams().values(), 
totalGetWorkBudget);
-              lastBudgetRefresh.set(Instant.now());
-            });
-    this.getWorkerMetadataStream =
-        Suppliers.memoize(
-            () ->
-                streamFactory.createGetWorkerMetadataStream(
-                    dispatcherClient.getWindmillMetadataServiceStubBlocking(),
-                    getWorkerMetadataThrottleTimer,
-                    endpoints ->
-                        // Run this on a separate thread than the grpc stream 
thread.
-                        newWorkerMetadataPublisher.submit(
-                            () -> newWindmillEndpoints.add(endpoints))));
+    this.windmillStreamManager =
+        Executors.newCachedThreadPool(
+            new 
ThreadFactoryBuilder().setNameFormat(STREAM_MANAGER_THREAD_NAME).build());
+    this.workerMetadataConsumer =
+        Executors.newSingleThreadScheduledExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build());
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.activeMetadataVersion = Long.MIN_VALUE;
     this.workCommitterFactory = workCommitterFactory;
-  }
-
-  private static ExecutorService singleThreadedExecutorServiceOf(String 
threadName) {
-    return Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactoryBuilder().setNameFormat(threadName).build());
+    // To satisfy CheckerFramework complaining about reference to "this" in 
constructor.
+    @SuppressWarnings("methodref.receiver.bound")
+    Consumer<WindmillEndpoints> newEndpointsConsumer = 
this::consumeWorkerMetadata;
+    this.getWorkerMetadataStream =
+        streamFactory.createGetWorkerMetadataStream(
+            dispatcherClient::getWindmillMetadataServiceStubBlocking,
+            getWorkerMetadataThrottleTimer,
+            newEndpointsConsumer);

Review Comment:
   Move the construction to start() and remove the SupressWarning? 
   One reason to not let `this` escape in constructor is the final fields are 
guaranteed to be final and visible to other classes after construction is 
complete. If `createGetWorkerMetadataStream` here calls `newEndpointsConsumer` 
inline there are no guarantees on what it'll get.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -317,29 +350,71 @@ public boolean commitWorkItem(
       if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!isShutdown()) {
+          flushInternal(queue);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        queuedBytes = 0;
+        queue.clear();
+      }
     }
 
     void add(long id, PendingRequest request) {
-      assert (canAccept(request.getBytes()));
+      Preconditions.checkState(canAccept(request.getBytes()));

Review Comment:
   Ideally CheckState's should not throw when things are working as expected. 
IllegalStateException means something unexpected happened. Can we call 
`request.onDone` with an error code when the stream is shutdown? If the stream 
is shutdown it means the work is no longer valid, right?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -63,32 +68,48 @@
  */
 public abstract class AbstractWindmillStream<RequestT, ResponseT> implements 
WindmillStream {
 
-  public static final long DEFAULT_STREAM_RPC_DEADLINE_SECONDS = 300;
   // Default gRPC streams to 2MB chunks, which has shown to be a large enough 
chunk size to reduce
   // per-chunk overhead, and small enough that we can still perform granular 
flow-control.
   protected static final int RPC_STREAM_CHUNK_SIZE = 2 << 20;
-  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractWindmillStream.class);
+  private static final Status OK_STATUS = Status.fromCode(Status.Code.OK);
+
   protected final AtomicBoolean clientClosed;
-  private final AtomicBoolean isShutdown;
+  protected final Sleeper sleeper;
   private final AtomicLong lastSendTimeMs;
-  private final Executor executor;
+  private final ExecutorService executor;
   private final BackOff backoff;
   private final AtomicLong startTimeMs;
   private final AtomicLong lastResponseTimeMs;
+  private final AtomicInteger restartCount;
   private final AtomicInteger errorCount;
-  private final AtomicReference<String> lastError;
-  private final AtomicReference<DateTime> lastErrorTime;
+  private final AtomicReference<String> lastRestartReason;
+  private final AtomicReference<DateTime> lastRestartTime;
   private final AtomicLong sleepUntil;
   private final CountDownLatch finishLatch;
   private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
   private final int logEveryNStreamFailures;
-  private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;
-  // Indicates if the current stream in requestObserver is closed by calling 
close() method
-  private final AtomicBoolean streamClosed;
   private final String backendWorkerToken;
-  private @Nullable StreamObserver<RequestT> requestObserver;
+  private final ResettableRequestObserver<RequestT> requestObserver;
+
+  /** Guards {@link #start()} and {@link #shutdown()} methods. */
+  private final Object shutdownLock = new Object();
+
+  /** Reads are lock free, writes are guarded by shutdownLock. */
+  private final AtomicBoolean isShutdown;
+
+  private final AtomicBoolean started;

Review Comment:
   now that we have `shutdownLock`, these can be volatile booleans



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -317,29 +350,71 @@ public boolean commitWorkItem(
       if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
         return false;
       }
-      PendingRequest request = new PendingRequest(computation, commitRequest, 
onDone);
+
+      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
       add(idGenerator.incrementAndGet(), request);
       return true;
     }
 
     /** Flushes any pending work items to the wire. */
     @Override
     public void flush() {
-      flushInternal(queue);
-      queuedBytes = 0;
-      queue.clear();
+      try {
+        if (!isShutdown()) {
+          flushInternal(queue);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        queuedBytes = 0;
+        queue.clear();
+      }
     }
 
     void add(long id, PendingRequest request) {
-      assert (canAccept(request.getBytes()));
+      Preconditions.checkState(canAccept(request.getBytes()));

Review Comment:
   Or we could remove the shutDown checks in `add` and call the callback with 
error status in `flush`.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+    while (!isShutdown()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException
+          | VerifyException
+          | CancellationException e) {
+        handleShutdown(request, e);
+        if (!(e instanceof CancellationException)) {
+          throw e;
+        }
       } catch (IOException e) {
         LOG.error("Parsing GetData response failed: ", e);
-        continue;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
+        handleShutdown(request, e);
         throw new RuntimeException(e);
       } finally {
         pending.remove(request.id());
       }
     }
+
+    // If we have exited the loop here, the stream has been shutdown. Cancel 
the response stream.
+    request.getResponseStream().cancel();

Review Comment:
   If it is not needed, lets remove it. To catch anything dangling, we can add 
a check and throw



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java:
##########
@@ -131,46 +124,43 @@ private static GetWorkRequest 
withRequestBudget(GetWorkRequest request, GetWorkB
     return 
request.toBuilder().setMaxItems(budget.items()).setMaxBytes(budget.bytes()).build();
   }
 
-  @SuppressWarnings("ReturnValueIgnored")
-  void startStreams() {
-    getWorkStream.get();
-    getDataStream.get();
-    commitWorkStream.get();
-    workCommitter.get().start();
-    // *stream.get() is all memoized in a threadsafe manner.
-    started.set(true);
-  }
-
-  void closeAllStreams() {
-    // Supplier<Stream>.get() starts the stream which is an expensive 
operation as it initiates the
-    // streaming RPCs by possibly making calls over the network. Do not close 
the streams unless
-    // they have already been started.
-    if (started.get()) {
-      getWorkStream.get().shutdown();
-      getDataStream.get().shutdown();
-      workCommitter.get().stop();
-      commitWorkStream.get().shutdown();
+  synchronized void start() {
+    if (!started.get()) {
+      // Start these 3 streams in parallel since they each may perform 
blocking IO.
+      CompletableFuture.allOf(
+              CompletableFuture.runAsync(getWorkStream::start, streamStarter),
+              CompletableFuture.runAsync(getDataStream::start, streamStarter),
+              CompletableFuture.runAsync(commitWorkStream::start, 
streamStarter))
+          .join();
+      workCommitter.start();
+      started.set(true);
     }
   }
 
   @Override
-  public void adjustBudget(long itemsDelta, long bytesDelta) {
-    getWorkBudget.set(getWorkBudget.get().apply(itemsDelta, bytesDelta));
+  public synchronized void close() {
     if (started.get()) {
-      getWorkStream.get().adjustBudget(itemsDelta, bytesDelta);
+      getWorkStream.shutdown();
+      getDataStream.shutdown();
+      workCommitter.stop();
+      commitWorkStream.shutdown();
     }
   }
 
   @Override
-  public GetWorkBudget remainingBudget() {
-    return started.get() ? getWorkStream.get().remainingBudget() : 
getWorkBudget.get();
+  public void setBudget(long items, long bytes) {
+    GetWorkBudget adjustment = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();

Review Comment:
   ```suggestion
       GetWorkBudget budget = 
GetWorkBudget.builder().setItems(items).setBytes(bytes).build();
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+final class GlobalDataStreamSender implements Closeable, 
Supplier<GetDataStream> {

Review Comment:
   nit: I would put the close method on a separate `StreamSender` interface, 
makes it easy to find references compared to the generic Closable interface.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -204,14 +224,14 @@ private void flushInternal(Map<Long, PendingRequest> 
requests) {
     }
   }
 
-  private void issueSingleRequest(final long id, PendingRequest 
pendingRequest) {
+  private void issueSingleRequest(long id, PendingRequest pendingRequest) {
     StreamingCommitWorkRequest.Builder requestBuilder = 
StreamingCommitWorkRequest.newBuilder();
     requestBuilder
         .addCommitChunkBuilder()
-        .setComputationId(pendingRequest.computation)
+        .setComputationId(pendingRequest.computationId())
         .setRequestId(id)
-        .setShardingKey(pendingRequest.request.getShardingKey())
-        .setSerializedWorkItemCommit(pendingRequest.request.toByteString());
+        .setShardingKey(pendingRequest.shardingKey())
+        .setSerializedWorkItemCommit(pendingRequest.serializedCommit());
     StreamingCommitWorkRequest chunk = requestBuilder.build();
     synchronized (this) {
       pending.put(id, pendingRequest);

Review Comment:
   Can this this `pending.put` execute after call to shutDown? What happens to 
the pending request if it does?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -273,10 +288,24 @@ public void 
onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
   @Override
   public void sendHealthCheck() {
     if (hasPendingRequests()) {
-      send(StreamingGetDataRequest.newBuilder().build());
+      send(HEALTH_CHECK_REQUEST);
     }
   }
 
+  @Override
+  protected void shutdownInternal() {
+    // Stream has been explicitly closed. Drain pending input streams and 
request batches.
+    // Future calls to send RPCs will fail.
+    pending.values().forEach(AppendableInputStream::cancel);

Review Comment:
   ```
   Preconditions.checkState(!isShutdown(), "Cannot send on shutdown stream.");
   verify(pending.put(request.id(), request.getResponseStream()) == null);
   ```
   
   Thread1 is past the isShutDown check and waiting before `pending.put`
   
   Thread2 completes shitdownInternal
   
   Thread1 will now do `pending.put()`.
   
   One way to avoid this race is to put both paths under same mutex. We'll have 
to reevaluate why shutdown cannot be guarded by `this` or a different common 
mutex. We probably should not block for IO when holding the `this`mutex.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+final class GlobalDataStreamSender implements Closeable, 
Supplier<GetDataStream> {

Review Comment:
   remove `Supplier<GetDataStream>`? do we need it?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,29 +391,38 @@ private void queueRequestAndWait(QueuedRequest request) 
throws InterruptedExcept
       batch.addRequest(request);
     }
     if (responsibleForSend) {
-      if (waitForSendLatch == null) {
+      if (prevBatch == null) {
         // If there was not a previous batch wait a little while to improve
         // batching.
-        Thread.sleep(1);
+        sleeper.sleep(1);
       } else {
-        waitForSendLatch.await();
+        prevBatch.waitForSendOrFailNotification();
       }
       // Finalize the batch so that no additional requests will be added.  
Leave the batch in the
       // queue so that a subsequent batch will wait for its completion.
       synchronized (batches) {
         verify(batch == batches.peekFirst());
         batch.markFinalized();
       }
+      trySendBatch(batch);
+    } else {
+      // Wait for this batch to be sent before parsing the response.
+      batch.waitForSendOrFailNotification();
+    }
+  }
+
+  void trySendBatch(QueuedBatch batch) {
+    try {
       sendBatch(batch.requests());
       synchronized (batches) {
         verify(batch == batches.pollFirst());
       }
       // Notify all waiters with requests in this batch as well as the sender
       // of the next batch (if one exists).
-      batch.countDown();
-    } else {
-      // Wait for this batch to be sent before parsing the response.
-      batch.await();
+      batch.notifySent();
+    } catch (Exception e) {
+      LOG.error("Error occurred sending batch.", e);
+      batch.notifyFailed();

Review Comment:
   We've to throw after notifyFailed for the sending thread to know about the 
failure?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -160,80 +179,125 @@ protected boolean isShutdown() {
   private StreamObserver<RequestT> requestObserver() {
     if (requestObserver == null) {
       throw new NullPointerException(
-          "requestObserver cannot be null. Missing a call to startStream() to 
initialize.");
+          "requestObserver cannot be null. Missing a call to start() to 
initialize stream.");
     }
 
     return requestObserver;
   }
 
   /** Send a request to the server. */
   protected final void send(RequestT request) {
-    lastSendTimeMs.set(Instant.now().getMillis());
     synchronized (this) {
+      if (isShutdown()) {
+        return;
+      }
+
       if (streamClosed.get()) {
         throw new IllegalStateException("Send called on a client closed 
stream.");
       }
 
-      requestObserver().onNext(request);
+      try {
+        lastSendTimeMs.set(Instant.now().getMillis());
+        requestObserver.onNext(request);
+      } catch (StreamObserverCancelledException e) {
+        if (isShutdown()) {
+          logger.debug("Stream was closed or shutdown during send.", e);
+          return;
+        }
+
+        requestObserver.onError(e);
+      }
+    }
+  }
+
+  @Override
+  public final void start() {
+    if (!isShutdown.get() && started.compareAndSet(false, true)) {
+      // start() should only be executed once during the lifetime of the 
stream for idempotency and
+      // when shutdown() has not been called.
+      startStream();
     }
   }
 
   /** Starts the underlying stream. */
-  protected final void startStream() {
+  private void startStream() {
     // Add the stream to the registry after it has been fully constructed.
     streamRegistry.add(this);
     while (true) {
       try {
         synchronized (this) {
+          if (isShutdown.get()) {
+            break;
+          }
           startTimeMs.set(Instant.now().getMillis());
           lastResponseTimeMs.set(0);
           streamClosed.set(false);
-          // lazily initialize the requestObserver. Gets reset whenever the 
stream is reopened.
-          requestObserver = requestObserverSupplier.get();
+          requestObserver.reset();
           onNewStream();
           if (clientClosed.get()) {
             halfClose();
           }
           return;

Review Comment:
   since we return here on the happy path adding the stream here would add it 
only once 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -74,69 +76,104 @@ public void onNext(T value) {
     while (true) {
       try {
         synchronized (lock) {
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
             outboundObserver.onNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
             outboundObserver.onNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          return;

Review Comment:
   > the stream has been shutdown and any requests should be dropped/ignored.
   
   Could you point me to how dropped commits get removed from the commit queue?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/GlobalDataStreamSender.java:
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+final class GlobalDataStreamSender implements Closeable, 
Supplier<GetDataStream> {
+  private final Endpoint endpoint;
+  private final GetDataStream delegate;
+  private volatile boolean started;
+
+  GlobalDataStreamSender(GetDataStream delegate, Endpoint endpoint) {
+    this.delegate = delegate;
+    this.started = false;
+    this.endpoint = endpoint;
+  }
+
+  @Override
+  public GetDataStream get() {
+    if (!started) {
+      startStream();
+    }
+
+    return delegate;
+  }
+
+  private synchronized void startStream() {
+    // Check started again after we acquire the lock.
+    if (!started) {
+      started = true;
+      delegate.start();
+    }
+  }
+
+  @Override
+  public void close() {
+    delegate.shutdown();

Review Comment:
   do we need to check `started` here?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -209,201 +198,201 @@ static FanOutStreamingEngineWorkerHarness forTesting(
             stubFactory,
             getWorkBudgetDistributor,
             dispatcherClient,
-            clientId,
             workCommitterFactory,
             getDataMetricTracker);
     fanOutStreamingEngineWorkProvider.start();
     return fanOutStreamingEngineWorkProvider;
   }
 
-  @SuppressWarnings("ReturnValueIgnored")
   @Override
   public synchronized void start() {
-    Preconditions.checkState(!started, "StreamingEngineClient cannot start 
twice.");
-    // Starts the stream, this value is memoized.
-    getWorkerMetadataStream.get();
-    startWorkerMetadataConsumer();
-    getWorkBudgetRefresher.start();
+    Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness 
cannot start twice.");
+    getWorkerMetadataStream.start();
     started = true;
   }
 
   public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
-    return connections.get().windmillConnections().keySet().stream()
+    return backends.get().windmillStreams().keySet().stream()
         .map(Endpoint::directEndpoint)
         .filter(Optional::isPresent)
         .map(Optional::get)
-        .filter(
-            windmillServiceAddress ->
-                windmillServiceAddress.getKind() != 
WindmillServiceAddress.Kind.IPV6)
-        .map(
-            windmillServiceAddress ->
-                windmillServiceAddress.getKind() == 
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
-                    ? windmillServiceAddress.gcpServiceAddress()
-                    : 
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+        .map(WindmillServiceAddress::getServiceAddress)
         .collect(toImmutableSet());
   }
 
   /**
-   * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or 
defaults to {@link
-   * GetDataStream} pointing to dispatcher.
+   * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+   * NoSuchElementException} if one is not found.
    */
   private GetDataStream getGlobalDataStream(String globalDataKey) {
-    return 
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
-        .map(Supplier::get)
-        .orElseGet(
-            () ->
-                streamFactory.createGetDataStream(
-                    dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
-  }
-
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
+    return 
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+        .map(GlobalDataStreamSender::get)
+        .orElseThrow(
+            () -> new NoSuchElementException("No endpoint for global data tag: 
" + globalDataKey));
   }
 
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
-    Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness 
never started.");
+    Preconditions.checkNotNull(getWorkerMetadataStream).shutdown();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
-    StreamingEngineConnectionState newConnectionsState =
-        StreamingEngineConnectionState.builder()
-            .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        activeMetadataVersion,
+        newWindmillEndpoints.version());
+    closeStaleStreams(newWindmillEndpoints);
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
+    StreamingEngineBackends newBackends =
+        StreamingEngineBackends.builder()
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
+    backends.set(newBackends);
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+    activeMetadataVersion = newWindmillEndpoints.version();
+  }
 
-    LOG.info(
-        "Setting new connections: {}. Previous connections: {}.",
-        newConnectionsState,
-        connections.get());
-    connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+  /** Close the streams that are no longer valid asynchronously. */
+  private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
+    StreamingEngineBackends currentBackends = backends.get();
+    ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
+        currentBackends.windmillStreams();
+    currentWindmillStreams.entrySet().stream()
+        .filter(
+            connectionAndStream ->
+                
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
+        .forEach(
+            entry -> {
+              CompletableFuture<Void> unused =
+                  CompletableFuture.runAsync(
+                      () -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+                      windmillStreamManager);
+            });
+
+    Set<Endpoint> newGlobalDataEndpoints =
+        new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
+    currentBackends.globalDataStreams().values().stream()
+        .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
+        .forEach(
+            sender -> {
+              CompletableFuture<Void> unused =
+                  CompletableFuture.runAsync(
+                      () -> closeStreamSender(sender.endpoint(), sender), 
windmillStreamManager);
+            });
+  }
+
+  private void closeStreamSender(Endpoint endpoint, Closeable sender) {
+    LOG.debug("Closing streams to endpoint={}, sender={}", endpoint, sender);
+    try {
+      sender.close();
+      endpoint.directEndpoint().ifPresent(channelCachingStubFactory::remove);
+      LOG.debug("Successfully closed streams to {}", endpoint);
+    } catch (Exception e) {
+      LOG.error("Error closing streams to endpoint={}, sender={}", endpoint, 
sender);
+    }
+  }
+
+  private synchronized CompletableFuture<ImmutableMap<Endpoint, 
WindmillStreamSender>>
+      createAndStartNewStreams(ImmutableSet<Endpoint> newWindmillEndpoints) {
+    ImmutableMap<Endpoint, WindmillStreamSender> currentStreams = 
backends.get().windmillStreams();
+    return MoreFutures.allAsList(
+            newWindmillEndpoints.stream()
+                .map(endpoint -> 
getOrCreateWindmillStreamSenderFuture(endpoint, currentStreams))
+                .collect(Collectors.toList()))
+        .thenApply(
+            backends -> 
backends.stream().collect(toImmutableMap(Pair::getLeft, Pair::getRight)))
+        .toCompletableFuture();
+  }
+
+  private CompletionStage<Pair<Endpoint, WindmillStreamSender>>
+      getOrCreateWindmillStreamSenderFuture(
+          Endpoint endpoint, ImmutableMap<Endpoint, WindmillStreamSender> 
currentStreams) {
+    return MoreFutures.supplyAsync(

Review Comment:
   can we return 
CompletableFuture.completedFuture(currentStreams.get(endpoint)) when 
currentStreams.get(endpoint) is not null without submitting it to the executor?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -302,38 +331,57 @@ public void appendSpecificHtml(PrintWriter writer) {
   }
 
   private <ResponseT> ResponseT issueRequest(QueuedRequest request, 
ParseFn<ResponseT> parseFn) {
-    while (true) {
+    while (!isShutdown()) {
       request.resetResponseStream();
       try {
         queueRequestAndWait(request);
         return parseFn.parse(request.getResponseStream());
-      } catch (CancellationException e) {
-        // Retry issuing the request since the response stream was cancelled.
-        continue;
+      } catch (AppendableInputStream.InvalidInputStreamStateException
+          | VerifyException

Review Comment:
   Since VerifyException is not expected when things are working as expected, 
why not throw it upstream as it is? Converting it to 
WindmillStreamShutdownException will mask the real failure and the worker will 
be operating in an unknown state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to