m-trieu commented on code in PR #34539:
URL: https://github.com/apache/beam/pull/34539#discussion_r2033806861


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -621,18 +622,40 @@ private static void 
validateWorkerOptions(DataflowWorkerHarnessOptions options)
   }
 
   private static ChannelCachingStubFactory createFanOutStubFactory(
-      DataflowWorkerHarnessOptions workerOptions) {
-    return ChannelCachingRemoteStubFactory.create(
-        workerOptions.getGcpCredential(),
+      DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher 
configFetcher) {
+    ChannelCache channelCache =
         ChannelCache.create(
-            serviceAddress ->
-                // IsolationChannel will create and manage separate RPC 
channels to the same
-                // serviceAddress.
-                IsolationChannel.create(
-                    () ->
-                        remoteChannel(
-                            serviceAddress,
-                            
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
+            serviceAddress -> {
+              // Always fetch the current flow control settings when we go to 
create the channel.
+              UserWorkerGrpcFlowControlSettings currentFlowControlSettings =
+                  configFetcher
+                      .getGlobalConfigHandle()
+                      .getConfig()
+                      .userWorkerJobSettings()
+                      .getFlowControlSettings();
+              // IsolationChannel will create and manage separate RPC channels 
to the same
+              // serviceAddress.
+              return IsolationChannel.create(
+                  () ->
+                      remoteChannel(
+                          serviceAddress,
+                          
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+                          currentFlowControlSettings),
+                  currentFlowControlSettings.getOnReadyThresholdBytes());
+            },
+            configFetcher

Review Comment:
   done, also set some defaults



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to