m-trieu commented on code in PR #34539:
URL: https://github.com/apache/beam/pull/34539#discussion_r2033814698
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -621,18 +622,40 @@ private static void
validateWorkerOptions(DataflowWorkerHarnessOptions options)
}
private static ChannelCachingStubFactory createFanOutStubFactory(
- DataflowWorkerHarnessOptions workerOptions) {
- return ChannelCachingRemoteStubFactory.create(
- workerOptions.getGcpCredential(),
+ DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher
configFetcher) {
+ ChannelCache channelCache =
ChannelCache.create(
- serviceAddress ->
- // IsolationChannel will create and manage separate RPC
channels to the same
- // serviceAddress.
- IsolationChannel.create(
- () ->
- remoteChannel(
- serviceAddress,
-
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
+ serviceAddress -> {
+ // Always fetch the current flow control settings when we go to
create the channel.
+ UserWorkerGrpcFlowControlSettings currentFlowControlSettings =
+ configFetcher
+ .getGlobalConfigHandle()
+ .getConfig()
+ .userWorkerJobSettings()
+ .getFlowControlSettings();
+ // IsolationChannel will create and manage separate RPC channels
to the same
+ // serviceAddress.
+ return IsolationChannel.create(
+ () ->
+ remoteChannel(
+ serviceAddress,
+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+ currentFlowControlSettings),
+ currentFlowControlSettings.getOnReadyThresholdBytes());
+ },
+ configFetcher
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]