parveensania commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2370955640


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +327,228 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private StreamingWorkerHarnessFactoryOutput createApplianceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+
+    GetDataClient getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
+    HeartbeatSender heartbeatSender = new 
ApplianceHeartbeatSender(windmillServer::getData);
+    WorkCommitter workCommitter =
+        StreamingApplianceWorkCommitter.create(windmillServer::commitWork, 
this::onCompleteCommit);
+    GetWorkSender getWorkSender = GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request));
+
+    return StreamingWorkerHarnessFactoryOutput.builder()
+        .setStreamingWorkerHarness(
+            SingleSourceWorkerHarness.builder()
+                .setStreamingWorkScheduler(streamingWorkScheduler)
+                .setWorkCommitter(workCommitter)
+                .setGetDataClient(getDataClient)
+                .setComputationStateFetcher(this.computationStateCache::get)
+                .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
+                .setHeartbeatSender(heartbeatSender)
+                .setGetWorkSender(getWorkSender)
+                .build())
+        .setGetDataStatusProvider(getDataClient::printHtml)
+        
.setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes)
+        .setChannelzServlet(null) // Appliance doesn't use ChannelzServlet
+        .setChannelCache(null) // Appliance doesn't use ChannelCache
+        .build();
+  }
+
+  private StreamingWorkerHarnessFactoryOutput 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    ChannelCache channelCache = createChannelCache(options, configFetcher);
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    ChannelzServlet channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return StreamingWorkerHarnessFactoryOutput.builder()
+        .setStreamingWorkerHarness(fanOutStreamingEngineWorkerHarness)
+        .setGetDataStatusProvider(getDataMetricTracker::printHtml)
+        .setCurrentActiveCommitBytesProvider(
+            fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes)
+        .setChannelzServlet(channelzServlet)
+        .setChannelCache(channelCache)
+        .build();
+  }
+
+  private StreamingWorkerHarnessFactoryOutput createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+    ChannelzServlet channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    return StreamingWorkerHarnessFactoryOutput.builder()
+        .setStreamingWorkerHarness(
+            SingleSourceWorkerHarness.builder()
+                .setStreamingWorkScheduler(streamingWorkScheduler)
+                .setWorkCommitter(workCommitter)
+                .setGetDataClient(getDataClient)
+                .setComputationStateFetcher(this.computationStateCache::get)
+                .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
+                .setHeartbeatSender(heartbeatSender)
+                .setGetWorkSender(getWorkSender)
+                .build())
+        .setGetDataStatusProvider(getDataClient::printHtml)
+        
.setCurrentActiveCommitBytesProvider(workCommitter::currentActiveCommitBytes)
+        .setChannelzServlet(channelzServlet)
+        .setChannelCache(null) // SingleSourceWorkerHarness doesn't use 
ChannelCache
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(ConnectivityType connectivityType) 
{
+    if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH
+            && this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)
+        || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH
+            && streamingWorkerHarness.get() instanceof 
SingleSourceWorkerHarness)) {
+      return;
+    }
+    // Stop the current status pages before switching the harness.
+    this.statusPages.get().stop();
+    LOG.debug("Stopped StreamingWorkerStatusPages before switching 
connectivity type.");
+    StreamingWorkerHarnessFactoryOutput newHarnessFactoryOutput = null;
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH");
+      LOG.debug("Shutting down to SingleSourceWorkerHarness");
+      this.streamingWorkerHarness.get().shutdown();
+      newHarnessFactoryOutput =
+          createFanOutStreamingEngineWorkerHarness(
+              this.clientId,
+              this.options,
+              this.windmillStreamFactory,
+              this.streamingWorkScheduler,
+              this.getDataMetricTracker,
+              this.memoryMonitor.memoryMonitor(),
+              this.dispatcherClient);
+      
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
+      streamingWorkerHarness.get().start();
+      LOG.debug("Started FanOutStreamingEngineWorkerHarness");
+    } else if (connectivityType == 
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+      LOG.info("Switching connectivity type from DIRECTPATH to CLOUDPATH");
+      LOG.debug("Shutting down FanOutStreamingEngineWorkerHarness");
+      streamingWorkerHarness.get().shutdown();
+      newHarnessFactoryOutput =
+          createSingleSourceWorkerHarness(
+              this.clientId,
+              this.options,
+              this.windmillServer,
+              this.streamingWorkScheduler,
+              this.getDataMetricTracker,
+              this.memoryMonitor.memoryMonitor());
+      
this.streamingWorkerHarness.set(newHarnessFactoryOutput.streamingWorkerHarness());
+      streamingWorkerHarness.get().start();
+      LOG.debug("Started SingleSourceWorkerHarness");
+    }
+    if (newHarnessFactoryOutput != null) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to