arunpandianp commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2323432538
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +327,228 @@ private StreamingDataflowWorker(
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}
+ private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
+ long clientId,
+ DataflowWorkerHarnessOptions options,
+ WindmillServerStub windmillServer,
+ StreamingWorkScheduler streamingWorkScheduler,
+ ThrottlingGetDataMetricTracker getDataMetricTracker,
+ MemoryMonitor memoryMonitor) {
+ Windmill.GetWorkRequest request =
+ Windmill.GetWorkRequest.newBuilder()
+ .setClientId(clientId)
+ .setMaxItems(chooseMaxBundlesOutstanding(options))
+ .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build();
+
+ GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
+ HeartbeatSender heartbeatSender = new
ApplianceHeartbeatSender(windmillServer::getData);
+ WorkCommitter workCommitter =
+ StreamingApplianceWorkCommitter.create(windmillServer::commitWork,
this::onCompleteCommit);
+ GetWorkSender getWorkSender = GetWorkSender.forAppliance(() ->
windmillServer.getWork(request));
+
+ return StreamingWorkerHarnessFactoryOutput.builder()
+ .setStreamingWorkerHarness(
+ SingleSourceWorkerHarness.builder()
+ .setStreamingWorkScheduler(streamingWorkScheduler)
+ .setWorkCommitter(workCommitter)
+ .setGetDataClient(getDataClient)
+ .setComputationStateFetcher(this.computationStateCache::get)
+ .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
+ .setHeartbeatSender(heartbeatSender)
+ .setGetWorkSender(getWorkSender)
+ .build())
+ .setGetDataStatusProvider(getDataClient::printHtml)
+
.setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes)
+ .setChannelzServlet(null) // Appliance doesn't use ChannelzServlet
+ .setChannelCache(null) // Appliance doesn't use ChannelCache
+ .build();
+ }
+
+ private StreamingWorkerHarnessFactoryOutput
createFanOutStreamingEngineWorkerHarness(
+ long clientId,
+ DataflowWorkerHarnessOptions options,
+ GrpcWindmillStreamFactory windmillStreamFactory,
+ StreamingWorkScheduler streamingWorkScheduler,
+ ThrottlingGetDataMetricTracker getDataMetricTracker,
+ MemoryMonitor memoryMonitor,
+ GrpcDispatcherClient dispatcherClient) {
+ WeightedSemaphore<Commit> maxCommitByteSemaphore =
Commits.maxCommitByteSemaphore();
+ ChannelCache channelCache = createChannelCache(options, configFetcher);
+ FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+ FanOutStreamingEngineWorkerHarness.create(
+ createJobHeader(options, clientId),
+ GetWorkBudget.builder()
+ .setItems(chooseMaxBundlesOutstanding(options))
+ .setBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build(),
+ windmillStreamFactory,
+ (workItem,
+ serializedWorkItemSize,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies) ->
+ computationStateCache
+ .get(processingContext.computationId())
+ .ifPresent(
+ computationState -> {
+ memoryMonitor.waitForResources("GetWork");
+ streamingWorkScheduler.scheduleWork(
+ computationState,
+ workItem,
+ serializedWorkItemSize,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies);
+ }),
+ ChannelCachingRemoteStubFactory.create(options.getGcpCredential(),
channelCache),
+ GetWorkBudgetDistributors.distributeEvenly(),
+ Preconditions.checkNotNull(dispatcherClient),
+ commitWorkStream ->
+ StreamingEngineWorkCommitter.builder()
+ // Share the commitByteSemaphore across all created
workCommitters.
+ .setCommitByteSemaphore(maxCommitByteSemaphore)
+
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+ .setOnCommitComplete(this::onCompleteCommit)
+
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setCommitWorkStreamFactory(
+ () -> CloseableStream.create(commitWorkStream, () ->
{}))
+ .build(),
+ getDataMetricTracker);
+ ChannelzServlet channelzServlet =
+ createChannelzServlet(
+ options,
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+ return StreamingWorkerHarnessFactoryOutput.builder()
+ .setStreamingWorkerHarness(fanOutStreamingEngineWorkerHarness)
+ .setGetDataStatusProvider(getDataMetricTracker::printHtml)
+ .setCurrentActiveCommitBytesProvider(
+ fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes)
+ .setChannelzServlet(channelzServlet)
+ .setChannelCache(channelCache)
+ .build();
+ }
+
+ private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
+ long clientId,
+ DataflowWorkerHarnessOptions options,
+ WindmillServerStub windmillServer,
+ StreamingWorkScheduler streamingWorkScheduler,
+ ThrottlingGetDataMetricTracker getDataMetricTracker,
+ MemoryMonitor memoryMonitor) {
+ Windmill.GetWorkRequest request =
+ Windmill.GetWorkRequest.newBuilder()
+ .setClientId(clientId)
+ .setMaxItems(chooseMaxBundlesOutstanding(options))
+ .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build();
+ WindmillStreamPool<GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+ GetDataClient getDataClient =
+ new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+ HeartbeatSender heartbeatSender =
+ createStreamingEngineHeartbeatSender(
+ options, windmillServer, getDataStreamPool,
configFetcher.getGlobalConfigHandle());
+ WorkCommitter workCommitter =
+ StreamingEngineWorkCommitter.builder()
+ .setCommitWorkStreamFactory(
+ WindmillStreamPool.create(
+ numCommitThreads, COMMIT_STREAM_TIMEOUT,
windmillServer::commitWorkStream)
+ ::getCloseableStream)
+ .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+ .setNumCommitSenders(numCommitThreads)
+ .setOnCommitComplete(this::onCompleteCommit)
+ .build();
+ GetWorkSender getWorkSender =
+ GetWorkSender.forStreamingEngine(
+ receiver -> windmillServer.getWorkStream(request, receiver));
+ ChannelzServlet channelzServlet =
+ createChannelzServlet(options,
windmillServer::getWindmillServiceEndpoints);
+ return StreamingWorkerHarnessFactoryOutput.builder()
+ .setStreamingWorkerHarness(
+ SingleSourceWorkerHarness.builder()
+ .setStreamingWorkScheduler(streamingWorkScheduler)
+ .setWorkCommitter(workCommitter)
+ .setGetDataClient(getDataClient)
+ .setComputationStateFetcher(this.computationStateCache::get)
+ .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
+ .setHeartbeatSender(heartbeatSender)
+ .setGetWorkSender(getWorkSender)
+ .build())
+ .setGetDataStatusProvider(getDataClient::printHtml)
+
.setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes)
+ .setChannelzServlet(channelzServlet)
+ .setChannelCache(null) // SingleSourceWorkerHarness doesn't use
ChannelCache
+ .build();
+ }
+
+ private void switchStreamingWorkerHarness(ConnectivityType connectivityType)
{
+ if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
+ && this.streamingWorkerHarness.get() instanceof
FanOutStreamingEngineWorkerHarness)
+ || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
+ && streamingWorkerHarness.get() instanceof
SingleSourceWorkerHarness)) {
+ return;
+ }
+ // Stop the current status pages before switching the harness.
+ this.statusPages.get().stop();
+ LOG.debug("Stopped StreamingWorkerStatusPages before switching
connectivity type.");
+ StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
+ if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+ LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH");
+ LOG.debug("Shutting down to SingleSourceWorkerHarness");
+ this.streamingWorkerHarness.get().shutdown();
+ newHarnessFactoryOutput =
+ createFanOutStreamingEngineWorkerHarness(
+ this.clientId,
+ this.options,
+ this.windmillStreamFactory,
+ this.streamingWorkScheduler,
+ this.getDataMetricTracker,
+ this.memoryMonitor.memoryMonitor(),
+ this.dispatcherClient);
+
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
+ streamingWorkerHarness.get().start();
+ LOG.debug("Started FanOutStreamingEngineWorkerHarness");
+ } else if (connectivityType ==
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+ LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
+ LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
+ streamingWorkerHarness.get().shutdown();
+ newHarnessFactoryOutput =
+ createSingleSourceWorkerHarness(
+ this.clientId,
+ this.options,
+ this.windmillServer,
+ this.streamingWorkScheduler,
+ this.getDataMetricTracker,
+ this.memoryMonitor.memoryMonitor());
+
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
+ streamingWorkerHarness.get().start();
+ LOG.debug("Started SingleSourceWorkerHarness");
+ }
+ if (newHarnessFactoryOutput != null) {
Review Comment:
I think the condition can be removed. newHarnessFactoryOutput should never
be null.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -215,150 +226,71 @@ private StreamingDataflowWorker(
Executors.newCachedThreadPool());
this.options = options;
this.workUnitExecutor = workUnitExecutor;
+ this.harnessSwitchExecutor =
+ Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setNameFormat("HarnessSwtichExecutor").build());
Review Comment:
```suggestion
new
ThreadFactoryBuilder().setNameFormat("HarnessSwitchExecutor").build());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]