parveensania commented on code in PR #35901: URL: https://github.com/apache/beam/pull/35901#discussion_r2370955640
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +327,228 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + + GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + HeartbeatSender heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + WorkCommitter workCommitter = + StreamingApplianceWorkCommitter.create(windmillServer::commitWork, this::onCompleteCommit); + GetWorkSender getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); + + return StreamingWorkerHarnessFactoryOutput.builder() + .setStreamingWorkerHarness( + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build()) + .setGetDataStatusProvider(getDataClient::printHtml) + .setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes) + .setChannelzServlet(null) // Appliance doesn't use ChannelzServlet + .setChannelCache(null) // Appliance doesn't use ChannelCache + .build(); + } + + private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + ChannelCache channelCache = createChannelCache(options, configFetcher); + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + ChannelzServlet channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return StreamingWorkerHarnessFactoryOutput.builder() + .setStreamingWorkerHarness(fanOutStreamingEngineWorkerHarness) + .setGetDataStatusProvider(getDataMetricTracker::printHtml) + .setCurrentActiveCommitBytesProvider( + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes) + .setChannelzServlet(channelzServlet) + .setChannelCache(channelCache) + .build(); + } + + private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + ChannelzServlet channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + return StreamingWorkerHarnessFactoryOutput.builder() + .setStreamingWorkerHarness( + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build()) + .setGetDataStatusProvider(getDataClient::printHtml) + .setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes) + .setChannelzServlet(channelzServlet) + .setChannelCache(null) // SingleSourceWorkerHarness doesn't use ChannelCache + .build(); + } + + private void switchStreamingWorkerHarness(ConnectivityType connectivityType) { + if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH + && this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness) + || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH + && streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) { + return; + } + // Stop the current status pages before switching the harness. + this.statusPages.get().stop(); + LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type."); + StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null; + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH"); + LOG.debug("Shutting down to SingleSourceWorkerHarness"); + this.streamingWorkerHarness.get().shutdown(); + newHarnessFactoryOutput = + createFanOutStreamingEngineWorkerHarness( + this.clientId, + this.options, + this.windmillStreamFactory, + this.streamingWorkScheduler, + this.getDataMetricTracker, + this.memoryMonitor.memoryMonitor(), + this.dispatcherClient); + this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness()); + streamingWorkerHarness.get().start(); + LOG.debug("Started FanOutStreamingEngineWorkerHarness"); + } else if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) { + LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH"); + LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness"); + streamingWorkerHarness.get().shutdown(); + newHarnessFactoryOutput = + createSingleSourceWorkerHarness( + this.clientId, + this.options, + this.windmillServer, + this.streamingWorkScheduler, + this.getDataMetricTracker, + this.memoryMonitor.memoryMonitor()); + this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness()); + streamingWorkerHarness.get().start(); + LOG.debug("Started SingleSourceWorkerHarness"); + } + if (newHarnessFactoryOutput != null) { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org