parveensania commented on code in PR #35901: URL: https://github.com/apache/beam/pull/35901#discussion_r2299424341
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +368,187 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + this.channelCache = createChannelCache(options, configFetcher); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + this.currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return fanOutStreamingEngineWorkerHarness; + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + this.channelCache = null; + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + // Stop the current status pages before switching the harness. + this.statusPages.stop(); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org