parveensania commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2299424341


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +368,187 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    this.channelCache = createChannelCache(options, configFetcher);
+    this.getDataStatusProvider = getDataMetricTracker::printHtml;
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    this.currentActiveCommitBytesProvider =
+        fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return fanOutStreamingEngineWorkerHarness;
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    this.channelCache = null;
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    // Stop the current status pages before switching the harness.
+    this.statusPages.stop();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to