m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1796129571


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +240,114 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
-    getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    Preconditions.checkNotNull(getWorkerMetadataStream).halfClose();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
-    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
-    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
-        createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+        return;
+      }
+    }
 
+    long previousMetadataVersion = activeMetadataVersion;
+    LOG.debug(
+        "Consuming new endpoints: {}. previous metadata version: {}, current 
metadata version: {}",
+        newWindmillEndpoints,
+        previousMetadataVersion,
+        activeMetadataVersion);
+    closeStaleStreams(
+        newWindmillEndpoints.windmillEndpoints(), 
connections.get().windmillStreams());
+    ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
+        
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();

Review Comment:
   well not really created
   we pass a factory to thr StreamGetDataClient
   
   and it will get created whenever the user code fetches side input



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to