arunpandianp commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1794868915


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -220,15 +203,16 @@ static FanOutStreamingEngineWorkerHarness forTesting(
   @Override
   public synchronized void start() {
     Preconditions.checkState(!started, "StreamingEngineClient cannot start 
twice.");
-    // Starts the stream, this value is memoized.
-    getWorkerMetadataStream.get();
-    startWorkerMetadataConsumer();
-    getWorkBudgetRefresher.start();
+    getWorkerMetadataStream =
+        streamFactory.createGetWorkerMetadataStream(
+            dispatcherClient.getWindmillMetadataServiceStubBlocking(),
+            getWorkerMetadataThrottleTimer,
+            this::consumeWorkerMetadata);
     started = true;
   }
 
   public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
-    return connections.get().windmillConnections().keySet().stream()
+    return connections.get().windmillStreams().keySet().stream()
         .map(Endpoint::directEndpoint)
         .filter(Optional::isPresent)
         .map(Optional::get)

Review Comment:
   Will filtering out IPV6 below prevent direct path endpoints from showing up 
in channelz?   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);
+    closeStaleStreams(
+        newWindmillEndpoints.windmillEndpoints(), 
connections.get().windmillStreams());
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
-            .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
-    LOG.info(
-        "Setting new connections: {}. Previous connections: {}.",
-        newConnectionsState,
-        connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+    activeMetadataVersion = newWindmillEndpoints.version();
+  }
+
+  /** Close the streams that are no longer valid asynchronously. */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void closeStaleStreams(
+      Collection<Endpoint> newWindmillConnections,
+      ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) {
+    currentStreams.entrySet().stream()
+        .filter(
+            connectionAndStream -> 
!newWindmillConnections.contains(connectionAndStream.getKey()))
+        .forEach(
+            entry ->
+                CompletableFuture.runAsync(
+                    () -> {
+                      LOG.debug("Closing streams to {}", entry);
+                      try {
+                        entry.getValue().closeAllStreams();
+                        entry
+                            .getKey()
+                            .directEndpoint()
+                            .ifPresent(channelCachingStubFactory::remove);
+                        LOG.debug("Successfully closed streams to {}", entry);
+                      } catch (Exception e) {
+                        LOG.error("Error closing streams to {}", entry);
+                      }
+                    },
+                    windmillStreamManager));
+  }
+
+  private synchronized CompletableFuture<ImmutableMap<Endpoint, 
WindmillStreamSender>>
+      createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) {

Review Comment:
   ```suggestion
         createAndStartNewStreams(ImmutableSet<Endpoint> 
newWindmillConnections) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);

Review Comment:
   previousMetadataVersion and activeMetadataVersion are same here.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        if (!isShutdown()) {
+          // Skip responses when the stream is shutdown since they are now 
invalid.
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) {
+            failure = e;
+          } else {
+            failure.addSuppressed(e);

Review Comment:
   There could be 1000s of requests in the queue, do we want to add all of them 
here? Maybe only the finalException or a subset is enough?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -381,29 +380,20 @@ private ImmutableMap<String, Supplier<GetDataStream>> 
createNewGlobalDataStreams
   private Supplier<GetDataStream> existingOrNewGetDataStreamFor(
       Entry<String, Endpoint> keyedEndpoint,
       ImmutableMap<String, Supplier<GetDataStream>> currentGlobalDataStreams) {
-    return Preconditions.checkNotNull(
+    return checkNotNull(
         currentGlobalDataStreams.getOrDefault(
             keyedEndpoint.getKey(),
             () ->
                 streamFactory.createGetDataStream(
-                    newOrExistingStubFor(keyedEndpoint.getValue()), new 
ThrottleTimer())));
-  }
-
-  private CloudWindmillServiceV1Alpha1Stub newOrExistingStubFor(Endpoint 
endpoint) {
-    return 
Optional.ofNullable(connections.get().windmillConnections().get(endpoint))
-        .map(WindmillConnection::stub)
-        .orElseGet(() -> createWindmillStub(endpoint));
+                    createWindmillStub(keyedEndpoint.getValue()), new 
ThrottleTimer())));
   }
 
-  private WindmillStreamSender createAndStartWindmillStreamSenderFor(
-      WindmillConnection connection) {
-    // Initially create each stream with no budget. The budget will be 
eventually assigned by the
-    // GetWorkBudgetDistributor.
+  private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint 
connection) {
     WindmillStreamSender windmillStreamSender =

Review Comment:
   Why do we need a dispatcher fallback in `createWindmillStub`?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);
+    closeStaleStreams(
+        newWindmillEndpoints.windmillEndpoints(), 
connections.get().windmillStreams());
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
-            .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
-
-    LOG.info(
-        "Setting new connections: {}. Previous connections: {}.",
-        newConnectionsState,
-        connections.get());
     connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+    activeMetadataVersion = newWindmillEndpoints.version();
+  }
+
+  /** Close the streams that are no longer valid asynchronously. */
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void closeStaleStreams(
+      Collection<Endpoint> newWindmillConnections,
+      ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) {
+    currentStreams.entrySet().stream()
+        .filter(
+            connectionAndStream -> 
!newWindmillConnections.contains(connectionAndStream.getKey()))
+        .forEach(
+            entry ->
+                CompletableFuture.runAsync(
+                    () -> {
+                      LOG.debug("Closing streams to {}", entry);
+                      try {
+                        entry.getValue().closeAllStreams();
+                        entry
+                            .getKey()
+                            .directEndpoint()
+                            .ifPresent(channelCachingStubFactory::remove);
+                        LOG.debug("Successfully closed streams to {}", entry);
+                      } catch (Exception e) {
+                        LOG.error("Error closing streams to {}", entry);
+                      }
+                    },
+                    windmillStreamManager));
+  }
+
+  private synchronized CompletableFuture<ImmutableMap<Endpoint, 
WindmillStreamSender>>
+      createAndStartNewStreams(Collection<Endpoint> newWindmillConnections) {
+    ImmutableMap<Endpoint, WindmillStreamSender> currentStreams =
+        connections.get().windmillStreams();
+    CompletionStage<List<Pair<Endpoint, WindmillStreamSender>>> 
connectionAndSenderFuture =
+        MoreFutures.allAsList(
+            newWindmillConnections.stream()
+                .map(
+                    connection ->
+                        MoreFutures.supplyAsync(
+                            () ->
+                                Pair.of(
+                                    connection,
+                                    
Optional.ofNullable(currentStreams.get(connection))
+                                        .orElseGet(
+                                            () -> 
createAndStartWindmillStreamSender(connection))),

Review Comment:
   ```suggestion
           MoreFutures.allAsList(
               newWindmillEndpoints.stream()
                   .map(
                       endpoint ->
                           MoreFutures.supplyAsync(
                               () ->
                                   Pair.of(
                                       endpoint,
                                       
Optional.ofNullable(currentStreams.get(endpoint))
                                           .orElseGet(
                                               () -> 
createAndStartWindmillStreamSender(endpoint))),
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -381,29 +380,20 @@ private ImmutableMap<String, Supplier<GetDataStream>> 
createNewGlobalDataStreams
   private Supplier<GetDataStream> existingOrNewGetDataStreamFor(
       Entry<String, Endpoint> keyedEndpoint,
       ImmutableMap<String, Supplier<GetDataStream>> currentGlobalDataStreams) {
-    return Preconditions.checkNotNull(
+    return checkNotNull(
         currentGlobalDataStreams.getOrDefault(
             keyedEndpoint.getKey(),
             () ->
                 streamFactory.createGetDataStream(
-                    newOrExistingStubFor(keyedEndpoint.getValue()), new 
ThrottleTimer())));
-  }
-
-  private CloudWindmillServiceV1Alpha1Stub newOrExistingStubFor(Endpoint 
endpoint) {
-    return 
Optional.ofNullable(connections.get().windmillConnections().get(endpoint))
-        .map(WindmillConnection::stub)
-        .orElseGet(() -> createWindmillStub(endpoint));
+                    createWindmillStub(keyedEndpoint.getValue()), new 
ThrottleTimer())));
   }
 
-  private WindmillStreamSender createAndStartWindmillStreamSenderFor(
-      WindmillConnection connection) {
-    // Initially create each stream with no budget. The budget will be 
eventually assigned by the
-    // GetWorkBudgetDistributor.
+  private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint 
connection) {

Review Comment:
   ```suggestion
     private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint 
endpoint) {
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +242,132 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  @SuppressWarnings("methodref.receiver.bound")
+  private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
+      @UnderInitialization FanOutStreamingEngineWorkerHarness this) {
+    // Checker Framework complains about reference to "this" in the 
constructor since the instance
+    // is "UnderInitialization" here, which we pass as a lambda to 
GetWorkerMetadataStream for
+    // processing new worker metadata. Supplier.get() is only called in 
start(), after we have
+    // constructed the FanOutStreamingEngineWorkerHarness.
+    return () ->
+        checkNotNull(streamFactory)
+            .createGetWorkerMetadataStream(
+                
checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(),
+                checkNotNull(getWorkerMetadataThrottleTimer),
+                this::consumeWorkerMetadata);
+  }
+
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {

Review Comment:
   Is the idea to track the latest received version? If so, the check inside 
`consumeWindmillWorkerEndpoints` can still be based on `activeMetadataVersion`. 
The check before queuing based on `pendingMetadataVersion` should be enough.
   
   Also we need to log/record `pendingMetadataVersion` somewhere, currently it 
is not recorded.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);
+    closeStaleStreams(
+        newWindmillEndpoints.windmillEndpoints(), 
connections.get().windmillStreams());
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
     StreamingEngineConnectionState newConnectionsState =
         StreamingEngineConnectionState.builder()
-            .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))

Review Comment:
   should we create globalDataStreams in parallel? i'm not sure if the code 
closes the stale globalDataStreams



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);
+    closeStaleStreams(
+        newWindmillEndpoints.windmillEndpoints(), 
connections.get().windmillStreams());
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();

Review Comment:
   We can defer the `join()` to be after creating the globalDataStreams.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,47 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;

Review Comment:
   Create a new Exception here and attach all failures as suppressed?
   
   Want to avoid marking failure from one request as suppressed of another.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -187,13 +211,14 @@ protected void startThrottleTimer() {
     commitWorkThrottleTimer.start();
   }
 
-  private void flushInternal(Map<Long, PendingRequest> requests) {
+  private void flushInternal(Map<Long, PendingRequest> requests) throws 
InterruptedException {

Review Comment:
   do we need `throws InterruptedException` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to