m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1796054945


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -256,58 +242,132 @@ private GetDataStream getGlobalDataStream(String 
globalDataKey) {
                     dispatcherClient.getWindmillServiceStub(), new 
ThrottleTimer()));
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void startWorkerMetadataConsumer() {
-    newWorkerMetadataConsumer.submit(
-        () -> {
-          while (true) {
-            Optional.ofNullable(newWindmillEndpoints.poll())
-                .ifPresent(this::consumeWindmillWorkerEndpoints);
-          }
-        });
-  }
-
   @VisibleForTesting
   @Override
   public synchronized void shutdown() {
     Preconditions.checkState(started, "StreamingEngineClient never started.");
     getWorkerMetadataStream.get().halfClose();
-    getWorkBudgetRefresher.stop();
-    newWorkerMetadataPublisher.shutdownNow();
-    newWorkerMetadataConsumer.shutdownNow();
+    workerMetadataConsumer.shutdownNow();
     channelCachingStubFactory.shutdown();
   }
 
-  /**
-   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
-   * new backend worker metadata.
-   */
+  @SuppressWarnings("methodref.receiver.bound")
+  private Supplier<GetWorkerMetadataStream> createGetWorkerMetadataStream(
+      @UnderInitialization FanOutStreamingEngineWorkerHarness this) {
+    // Checker Framework complains about reference to "this" in the 
constructor since the instance
+    // is "UnderInitialization" here, which we pass as a lambda to 
GetWorkerMetadataStream for
+    // processing new worker metadata. Supplier.get() is only called in 
start(), after we have
+    // constructed the FanOutStreamingEngineWorkerHarness.
+    return () ->
+        checkNotNull(streamFactory)
+            .createGetWorkerMetadataStream(
+                
checkNotNull(dispatcherClient).getWindmillMetadataServiceStubBlocking(),
+                checkNotNull(getWorkerMetadataThrottleTimer),
+                this::consumeWorkerMetadata);
+  }
+
+  private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
+    synchronized (metadataLock) {
+      // Only process versions greater than what we currently have to prevent 
double processing of
+      // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
+      if (windmillEndpoints.version() > pendingMetadataVersion) {
+        pendingMetadataVersion = windmillEndpoints.version();
+        workerMetadataConsumer.execute(() -> 
consumeWindmillWorkerEndpoints(windmillEndpoints));
+      }
+    }
+  }
+
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
-    isBudgetRefreshPaused.set(true);
+    // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
+    // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
+    // the endpoints if they are the most current version.
+    synchronized (metadataLock) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion) {

Review Comment:
   i am going to have a seperate PR where I improve the debug capture pages, 
planning to add that there but will add a TODO for now
   
   so we track because:
   
   its possible that when we are consuming a certain metadataVersion, multiple 
new versions can come in and while get queued up 
   we use a single threaded executor + synchronization so only 1 task doing 
`consumeWindmillWorkerEndpoints` runs at a time.
   
   so the flow is the following
   
   1. we get metadataVersion 1 from Windmill
   2. current `pendingMetadataVersion` is 0 so, so we update 
`pendingMetadataVersion` to 1 and submit a task to the executor (executor is 
single threaded and executes tasks serially)
   3. (in executor) now in `consumeWindmillWorkerEndpoints`, we see that the 
endpoints.version passed in is not less than `pendingMetadataVersion` so we 
proceed with processing the endpoints
   4. at the same time we get metadataVersion 2 from Windmill
   5. current `pendingMetadataVersion` is 1 so, so we update 
`pendingMetadataVersion` to 2 and submit a task to the executor (executor is 
single threaded and executes tasks serially)
   6. now we get metadataVersion 3 from Windmill
   7. current `pendingMetadataVersion` is 2 so, so we update 
`pendingMetadataVersion` to 3 and submit a task to the executor (executor is 
single threaded and executes tasks serially)
   
   The executor queue now looks like:
    
------------------------------------------------------------------------------------------------------------
   || EXECUTING consuming endpoints version 1 || QUEUED endpoints version 2 || 
QUEUED endpoints version 3 ||
    
------------------------------------------------------------------------------------------------------------
    
    8. endpoints version 1 is now consumed, `activeMetadataVersion` is 1, we 
release the lock and now proceed to consume endpoints version 2
    9. in `consumeWindmillWorkerEndpoints` we see that endpoints.version = 2, 
but `pendingMetadataVersion` is actually 3, so we skip endpoints.version 2, 
`activeMetadataVersion` is still 1
    10. now its time to consume endpoints version 3, `pendingMetadataVersion` 
is not less than 3 so we consume endpoints version 3, `activeMetadataVersion` 
is now 3
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to