m-trieu commented on code in PR #34653:
URL: https://github.com/apache/beam/pull/34653#discussion_r2076046164


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -522,11 +517,46 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
+        dependencies.windmillStreamFactory(),
         Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
         stageInfo,
-        
configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient());
+        dependencies.windmillDispatcherClient(),
+        dependencies.channelCache(),
+        dependencies.stubFactory());
+  }
+
+  private static Dependencies.Builder initialDependencies(
+      DataflowWorkerHarnessOptions options, ComputationConfig.Fetcher 
configFetcher) {
+    if (options.getUseWindmillIsolatedChannels() == null
+        || options.getIsWindmillServiceDirectPathEnabled()) {
+      ConfigAwareChannelFactory channelFactory =
+          new 
ConfigAwareChannelFactory(options.getWindmillServiceRpcChannelAliveTimeoutSec());
+      ChannelCache channelCache = ChannelCache.create(channelFactory);
+      ChannelCachingRemoteStubFactory stubFactory =
+          ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache);
+      GrpcDispatcherClient dispatcherClient = 
GrpcDispatcherClient.create(stubFactory);
+      configFetcher
+          .getGlobalConfigHandle()
+          .registerConfigObserver(
+              config -> {
+                if (channelFactory.tryConsumeJobConfig(config)) {

Review Comment:
   currently, the ConfigAwareChannelFactory only looks at the IsolatedChannels 
job setting, did we want to change this to compare all of the JobSettings?
   
   in the ChannelCache we only care about the flow control settings



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to