scwhittle commented on code in PR #34653:
URL: https://github.com/apache/beam/pull/34653#discussion_r2077442103
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -522,11 +517,46 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
workFailureProcessor,
streamingCounters,
memoryMonitor,
-
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
+ dependencies.windmillStreamFactory(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
stageInfo,
-
configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient());
+ dependencies.windmillDispatcherClient(),
+ dependencies.channelCache(),
+ dependencies.stubFactory());
+ }
+
+ private static Dependencies.Builder initialDependencies(
+ DataflowWorkerHarnessOptions options, ComputationConfig.Fetcher
configFetcher) {
+ if (options.getUseWindmillIsolatedChannels() == null
+ || options.getIsWindmillServiceDirectPathEnabled()) {
+ ConfigAwareChannelFactory channelFactory =
+ new
ConfigAwareChannelFactory(options.getWindmillServiceRpcChannelAliveTimeoutSec());
+ ChannelCache channelCache = ChannelCache.create(channelFactory);
+ ChannelCachingRemoteStubFactory stubFactory =
+ ChannelCachingRemoteStubFactory.create(options.getGcpCredential(),
channelCache);
+ GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(stubFactory);
+ configFetcher
+ .getGlobalConfigHandle()
+ .registerConfigObserver(
+ config -> {
+ if (channelFactory.tryConsumeJobConfig(config)) {
Review Comment:
but in this case we're not creating the configawarechannelcache so it seems
we are ignoring the flow control for direct path now.
If we can have all of the watching and channel updating in the factory it
seems like it would be simpler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]