m-trieu commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1805817011


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -209,201 +198,201 @@ static FanOutStreamingEngineWorkerHarness forTesting(
             stubFactory,
             getWorkBudgetDistributor,
             dispatcherClient,
-            clientId,
             workCommitterFactory,
             getDataMetricTracker);
     fanOutStreamingEngineWorkProvider.start();
     return fanOutStreamingEngineWorkProvider;
   }
 
-  @SuppressWarnings("ReturnValueIgnored")
   @Override
   public synchronized void start() {
-    Preconditions.checkState(!started, "StreamingEngineClient cannot start 
twice.");
-    // Starts the stream, this value is memoized.
-    getWorkerMetadataStream.get();
-    startWorkerMetadataConsumer();
-    getWorkBudgetRefresher.start();
+    Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness 
cannot start twice.");
+    getWorkerMetadataStream.start();
     started = true;
   }
 
   public ImmutableSet<HostAndPort> currentWindmillEndpoints() {
-    return connections.get().windmillConnections().keySet().stream()
+    return backends.get().windmillStreams().keySet().stream()
         .map(Endpoint::directEndpoint)
         .filter(Optional::isPresent)
         .map(Optional::get)
-        .filter(
-            windmillServiceAddress ->
-                windmillServiceAddress.getKind() != 
WindmillServiceAddress.Kind.IPV6)
-        .map(
-            windmillServiceAddress ->
-                windmillServiceAddress.getKind() == 
WindmillServiceAddress.Kind.GCP_SERVICE_ADDRESS
-                    ? windmillServiceAddress.gcpServiceAddress()
-                    : 
windmillServiceAddress.authenticatedGcpServiceAddress().gcpServiceAddress())
+        .map(WindmillServiceAddress::getServiceAddress)
         .collect(toImmutableSet());
   }
 
   /**
-   * Fetches {@link GetDataStream} mapped to globalDataKey if one exists, or 
defaults to {@link
-   * GetDataStream} pointing to dispatcher.
+   * Fetches {@link GetDataStream} mapped to globalDataKey if or throws {@link
+   * NoSuchElementException} if one is not found.
    */
   private GetDataStream getGlobalDataStream(String globalDataKey) {
-    return 
Optional.ofNullable(connections.get().globalDataStreams().get(globalDataKey))
-        .map(Supplier::get)
-        .orElseGet(
-            () ->
-                streamFactory.createGetDataStream(
-                    dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
-  }
-
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
+    return 
Optional.ofNullable(backends.get().globalDataStreams().get(globalDataKey))
+        .map(GlobalDataStreamSender::get)
+        .orElseThrow(
+            () -> new NoSuchElementException("No endpoint for global data tag: 
" + globalDataKey));
   }
 
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
-    Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkState(started, "FanOutStreamingEngineWorkerHarness 
never started.");
+    Preconditions.checkNotNull(getWorkerMetadataStream).shutdown();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
-    StreamingEngineConnectionState newConnectionsState =
-        StreamingEngineConnectionState.builder()
-            .setWindmillConnections(newWindmillConnections)
-            .setWindmillStreams(
-                
closeStaleStreamsAndCreateNewStreams(newWindmillConnections.values()))
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        activeMetadataVersion,
+        newWindmillEndpoints.version());
+    closeStaleStreams(newWindmillEndpoints);
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
+    StreamingEngineBackends newBackends =
+        StreamingEngineBackends.builder()
+            .setWindmillStreams(newStreams)
             .setGlobalDataStreams(
                 
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
             .build();
+    backends.set(newBackends);
+    getWorkBudgetDistributor.distributeBudget(newStreams.values(), 
totalGetWorkBudget);
+    activeMetadataVersion = newWindmillEndpoints.version();
+  }
 
-    LOG.info(
-        "Setting new connections: {}. Previous connections: {}.",
-        newConnectionsState,
-        connections.get());
-    connections.set(newConnectionsState);
-    isBudgetRefreshPaused.set(false);
-    getWorkBudgetRefresher.requestBudgetRefresh();
+  /** Close the streams that are no longer valid asynchronously. */
+  private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
+    StreamingEngineBackends currentBackends = backends.get();
+    ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
+        currentBackends.windmillStreams();
+    currentWindmillStreams.entrySet().stream()
+        .filter(
+            connectionAndStream ->
+                
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
+        .forEach(
+            entry -> {
+              CompletableFuture<Void> unused =
+                  CompletableFuture.runAsync(
+                      () -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+                      windmillStreamManager);
+            });
+
+    Set<Endpoint> newGlobalDataEndpoints =
+        new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
+    currentBackends.globalDataStreams().values().stream()
+        .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
+        .forEach(
+            sender -> {
+              CompletableFuture<Void> unused =
+                  CompletableFuture.runAsync(
+                      () -> closeStreamSender(sender.endpoint(), sender), 
windmillStreamManager);
+            });
+  }
+
+  private void closeStreamSender(Endpoint endpoint, Closeable sender) {
+    LOG.debug("Closing streams to endpoint={}, sender={}", endpoint, sender);
+    try {
+      sender.close();
+      endpoint.directEndpoint().ifPresent(channelCachingStubFactory::remove);
+      LOG.debug("Successfully closed streams to {}", endpoint);
+    } catch (Exception e) {
+      LOG.error("Error closing streams to endpoint={}, sender={}", endpoint, 
sender);
+    }
+  }
+
+  private synchronized CompletableFuture<ImmutableMap<Endpoint, 
WindmillStreamSender>>
+      createAndStartNewStreams(ImmutableSet<Endpoint> newWindmillEndpoints) {
+    ImmutableMap<Endpoint, WindmillStreamSender> currentStreams = 
backends.get().windmillStreams();
+    return MoreFutures.allAsList(
+            newWindmillEndpoints.stream()
+                .map(endpoint -> 
getOrCreateWindmillStreamSenderFuture(endpoint, currentStreams))
+                .collect(Collectors.toList()))
+        .thenApply(
+            backends -> 
backends.stream().collect(toImmutableMap(Pair::getLeft, Pair::getRight)))
+        .toCompletableFuture();
+  }
+
+  private CompletionStage<Pair<Endpoint, WindmillStreamSender>>
+      getOrCreateWindmillStreamSenderFuture(
+          Endpoint endpoint, ImmutableMap<Endpoint, WindmillStreamSender> 
currentStreams) {
+    return MoreFutures.supplyAsync(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to