arunpandianp commented on code in PR #35901:
URL: https://github.com/apache/beam/pull/35901#discussion_r2284098043


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -183,6 +186,13 @@ public final class StreamingDataflowWorker {
   private final ActiveWorkRefresher activeWorkRefresher;
   private final StreamingWorkerStatusReporter workerStatusReporter;
   private final int numCommitThreads;
+  private Consumer<PrintWriter> getDataStatusProvider;
+  private Supplier<Long> currentActiveCommitBytesProvider;
+  private @Nullable ChannelzServlet channelzServlet;
+  private @Nullable ChannelCache channelCache;
+  private Supplier<Instant> clock;

Review Comment:
   I don't think we need these to be variables at StreamingDataflowWorker level.
   
   The fields are currently computed inside the StreamingWorkerHarness factory 
methods and used by the same thread to set fields in statusPages.
    
   One option is to make `createFanOutStreamingEngineWorkerHarness` and 
`createSingleSourceWorkerHarness` return 
StreamingEngineWorkerHarnessFactoryOutput and use it to initialize StatusPages.
   
   ```  @AutoValue
     abstract class StreamingEngineWorkerHarnessFactoryOutput {
       private StreamingWorkerHarness streamingEngineWorkerHarness;
       private Consumer<PrintWriter> getDataStatusProvider;
       private Supplier<Long> currentActiveCommitBytesProvider;
       private @Nullable ChannelzServlet channelzServlet;
       private @Nullable ChannelCache channelCache;
     }```
   
   
   
   
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -170,11 +172,12 @@ public final class StreamingDataflowWorker {
       "windmill_bounded_queue_executor_use_fair_monitor";
 
   private final WindmillStateCache stateCache;
-  private final StreamingWorkerStatusPages statusPages;
+  private StreamingWorkerStatusPages statusPages;

Review Comment:
   Wrap it with an AtomicReference?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +368,187 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    this.channelCache = createChannelCache(options, configFetcher);
+    this.getDataStatusProvider = getDataMetricTracker::printHtml;
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    this.currentActiveCommitBytesProvider =
+        fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return fanOutStreamingEngineWorkerHarness;
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    this.channelCache = null;
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    // Stop the current status pages before switching the harness.
+    this.statusPages.stop();
+    LOG.debug("Stopped StreamingWorkerStatusPages before switching 
connectivity type.");
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {
+        LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH");
+        LOG.debug("Shutting down to SingleSourceWorkerHarness");
+        this.streamingWorkerHarness.get().shutdown();
+        FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness =
+            createFanOutStreamingEngineWorkerHarness(
+                clientId,
+                options,
+                windmillStreamFactory,
+                streamingWorkScheduler,
+                getDataMetricTracker,
+                memoryMonitor,
+                dispatcherClient);
+        this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness);
+        streamingWorkerHarness.get().start();

Review Comment:
   can do `fanoutStreamingWorkerHarness.start()` instead



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +368,187 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    this.channelCache = createChannelCache(options, configFetcher);
+    this.getDataStatusProvider = getDataMetricTracker::printHtml;
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    this.currentActiveCommitBytesProvider =
+        fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return fanOutStreamingEngineWorkerHarness;
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    this.channelCache = null;
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    // Stop the current status pages before switching the harness.
+    this.statusPages.stop();
+    LOG.debug("Stopped StreamingWorkerStatusPages before switching 
connectivity type.");
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {

Review Comment:
   We won't need the second check with the early exit check. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -183,6 +186,13 @@ public final class StreamingDataflowWorker {
   private final ActiveWorkRefresher activeWorkRefresher;
   private final StreamingWorkerStatusReporter workerStatusReporter;
   private final int numCommitThreads;
+  private Consumer<PrintWriter> getDataStatusProvider;
+  private Supplier<Long> currentActiveCommitBytesProvider;
+  private @Nullable ChannelzServlet channelzServlet;
+  private @Nullable ChannelCache channelCache;
+  private Supplier<Instant> clock;

Review Comment:
   can be final



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +368,187 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    this.channelCache = createChannelCache(options, configFetcher);
+    this.getDataStatusProvider = getDataMetricTracker::printHtml;
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    this.currentActiveCommitBytesProvider =
+        fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return fanOutStreamingEngineWorkerHarness;
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    this.channelCache = null;
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    // Stop the current status pages before switching the harness.
+    this.statusPages.stop();
+    LOG.debug("Stopped StreamingWorkerStatusPages before switching 
connectivity type.");
+    if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) {
+      if (!(this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness)) {
+        LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH");
+        LOG.debug("Shutting down to SingleSourceWorkerHarness");
+        this.streamingWorkerHarness.get().shutdown();
+        FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness =
+            createFanOutStreamingEngineWorkerHarness(
+                clientId,
+                options,
+                windmillStreamFactory,
+                streamingWorkScheduler,
+                getDataMetricTracker,
+                memoryMonitor,
+                dispatcherClient);
+        this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness);
+        streamingWorkerHarness.get().start();
+        LOG.debug("Started FanOutStreamingEngineWorkerHarness");
+        return;
+      }
+    } else if (connectivityType == 
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) {
+      if (!(streamingWorkerHarness.get() instanceof 
SingleSourceWorkerHarness)) {

Review Comment:
   We won't need the second check with the early exit check. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -907,9 +1062,10 @@ void stop() {
       activeWorkRefresher.stop();
       statusPages.stop();
       running.set(false);
-      streamingWorkerHarness.shutdown();
+      streamingWorkerHarness.get().shutdown();
       memoryMonitor.shutdown();
       workUnitExecutor.shutdown();
+      harnessSwitchExecutor.shutdown();

Review Comment:
   shutdown harnessSwitchExecutor before shutting down streamingWorkerHarness? 
doing so will make sure a new streamingWorkerHarness is not set after shutting 
down the old one here.  



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -889,7 +1044,7 @@ public void start() {
     running.set(true);
     configFetcher.start();
     memoryMonitor.start();
-    streamingWorkerHarness.start();
+    streamingWorkerHarness.get().start();

Review Comment:
   There is a race between `streamingWorkerHarness.get().start()` here and the 
harnessSwitchExecutor swapping a new `streamingWorkerHarness` and calling start 
on it. It'll end up calling `start()` on an already started 
`streamingWorkerHarness` and check fail.
   
   Registering the harness switch registerConfigObserver here after starting 
the existing `streamingWorkerHarness` should fix the race. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -394,6 +368,187 @@ private StreamingDataflowWorker(
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private FanOutStreamingEngineWorkerHarness 
createFanOutStreamingEngineWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient) {
+    WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+    this.channelCache = createChannelCache(options, configFetcher);
+    this.getDataStatusProvider = getDataMetricTracker::printHtml;
+    FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+        FanOutStreamingEngineWorkerHarness.create(
+            createJobHeader(options, clientId),
+            GetWorkBudget.builder()
+                .setItems(chooseMaxBundlesOutstanding(options))
+                .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                .build(),
+            windmillStreamFactory,
+            (workItem,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                getWorkStreamLatencies) ->
+                computationStateCache
+                    .get(processingContext.computationId())
+                    .ifPresent(
+                        computationState -> {
+                          memoryMonitor.waitForResources("GetWork");
+                          streamingWorkScheduler.scheduleWork(
+                              computationState,
+                              workItem,
+                              serializedWorkItemSize,
+                              watermarks,
+                              processingContext,
+                              getWorkStreamLatencies);
+                        }),
+            ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
+            GetWorkBudgetDistributors.distributeEvenly(),
+            Preconditions.checkNotNull(dispatcherClient),
+            commitWorkStream ->
+                StreamingEngineWorkCommitter.builder()
+                    // Share the commitByteSemaphore across all created 
workCommitters.
+                    .setCommitByteSemaphore(maxCommitByteSemaphore)
+                    
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                    .setOnCommitComplete(this::onCompleteCommit)
+                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setCommitWorkStreamFactory(
+                        () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                    .build(),
+            getDataMetricTracker);
+    this.currentActiveCommitBytesProvider =
+        fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(
+            options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+    return fanOutStreamingEngineWorkerHarness;
+  }
+
+  private StreamingWorkerHarness createSingleSourceWorkerHarness(
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillServer,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor) {
+    Windmill.GetWorkRequest request =
+        Windmill.GetWorkRequest.newBuilder()
+            .setClientId(clientId)
+            .setMaxItems(chooseMaxBundlesOutstanding(options))
+            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+            .build();
+    WindmillStreamPool<GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+    GetDataClient getDataClient =
+        new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
+    HeartbeatSender heartbeatSender =
+        createStreamingEngineHeartbeatSender(
+            options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+    WorkCommitter workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            .setCommitWorkStreamFactory(
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
+            .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+            .setNumCommitSenders(numCommitThreads)
+            .setOnCommitComplete(this::onCompleteCommit)
+            .build();
+    GetWorkSender getWorkSender =
+        GetWorkSender.forStreamingEngine(
+            receiver -> windmillServer.getWorkStream(request, receiver));
+
+    this.getDataStatusProvider = getDataClient::printHtml;
+    this.currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+    this.channelzServlet =
+        createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+    this.channelCache = null;
+
+    return SingleSourceWorkerHarness.builder()
+        .setStreamingWorkScheduler(streamingWorkScheduler)
+        .setWorkCommitter(workCommitter)
+        .setGetDataClient(getDataClient)
+        .setComputationStateFetcher(this.computationStateCache::get)
+        .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
+        .setHeartbeatSender(heartbeatSender)
+        .setGetWorkSender(getWorkSender)
+        .build();
+  }
+
+  private void switchStreamingWorkerHarness(
+      ConnectivityType connectivityType,
+      long clientId,
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      StreamingWorkScheduler streamingWorkScheduler,
+      ThrottlingGetDataMetricTracker getDataMetricTracker,
+      MemoryMonitor memoryMonitor,
+      GrpcDispatcherClient dispatcherClient,
+      WindmillServerStub windmillServer) {
+    // Stop the current status pages before switching the harness.
+    this.statusPages.stop();

Review Comment:
   We don't need to stop/start statusPages if the workerHarness didn't change. 
   
   We could add an early exit to switchStreamingWorkerHarness
   ```
   private void switchStreamingWorkerHarness(...) {
   if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH && 
this.streamingWorkerHarness.get() instanceof 
FanOutStreamingEngineWorkerHarness) || (connectivityType == 
ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH && streamingWorkerHarness.get() 
instanceof SingleSourceWorkerHarness))  {
   return;
   }
   rest of logic
   
   
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java:
##########
@@ -179,8 +184,13 @@ private void streamingEngineDispatchLoop(
         // Reconnect every now and again to enable better load balancing.
         // If at any point the server closes the stream, we will reconnect 
immediately; otherwise
         // we half-close the stream after some time and create a new one.
-        if (!stream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, 
TimeUnit.MINUTES)) {
-          stream.halfClose();
+        if (getWorkStream != null) {
+          if (!getWorkStream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, 
TimeUnit.MINUTES)) {
+            if (getWorkStream
+                != null) { // checking for null again to keep the static 
analyzer happy.
+              getWorkStream.halfClose();

Review Comment:
   `Preconditions.checkNotNull(getWorkStream).halfClose()` should also make the 
null checker happy.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    GrpcDispatcherClient mockDispatcherClient = 
mock(GrpcDispatcherClient.class);
+
+    // FanOutStreamingEngineWorkerHarness creates
+    // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to
+    // successfully start. Mocking it here.
+    Channel mockChannel = mock(Channel.class);
+    ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall =
+        mock(ClientCall.class);
+    when(mockChannel.newCall(
+            
eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), 
any()))
+        .thenReturn(mockClientCall);
+    when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking())
+        
.thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel));
+    java.lang.reflect.Field dispatcherClientField =
+        StreamingDataflowWorker.class.getDeclaredField("dispatcherClient");
+    dispatcherClientField.setAccessible(true);
+    dispatcherClientField.set(worker, mockDispatcherClient);

Review Comment:
   Prefer fakes/mocks over using reflection directly.
   
   Here we can update FakeWindmillServer.java to setup fake metadata responses. 



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java:
##########
@@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception {
         removeDynamicFields(result.get(1L)));
   }
 
+  @Test
+  public void testSwitchStreamingWorkerHarness() throws Exception {
+    if (!streamingEngine) {
+      return;
+    }
+
+    List<ParallelInstruction> instructions =
+        Arrays.asList(
+            makeSourceInstruction(StringUtf8Coder.of()),
+            makeSinkInstruction(StringUtf8Coder.of(), 0));
+
+    // Start with CloudPath.
+    DataflowWorkerHarnessOptions options =
+        
createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false");
+
+    StreamingDataflowWorker worker =
+        makeWorker(
+            defaultWorkerParams()
+                .setOptions(options)
+                .setInstructions(instructions)
+                .publishCounters()
+                .build());
+
+    GrpcDispatcherClient mockDispatcherClient = 
mock(GrpcDispatcherClient.class);
+
+    // FanOutStreamingEngineWorkerHarness creates
+    // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to
+    // successfully start. Mocking it here.
+    Channel mockChannel = mock(Channel.class);
+    ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall =
+        mock(ClientCall.class);
+    when(mockChannel.newCall(
+            
eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), 
any()))
+        .thenReturn(mockClientCall);
+    when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking())
+        
.thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel));
+    java.lang.reflect.Field dispatcherClientField =
+        StreamingDataflowWorker.class.getDeclaredField("dispatcherClient");
+    dispatcherClientField.setAccessible(true);
+    dispatcherClientField.set(worker, mockDispatcherClient);
+
+    // Capture the config observer.
+    ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor =
+        ArgumentCaptor.forClass(Consumer.class);
+    verify(mockGlobalConfigHandle, 
atLeastOnce()).registerConfigObserver(observerCaptor.capture());
+    List<Consumer<StreamingGlobalConfig>> observers = 
observerCaptor.getAllValues();
+
+    worker.start();
+
+    // Use reflection to check the harness type.
+    java.lang.reflect.Field harnessField =
+        
StreamingDataflowWorker.class.getDeclaredField("streamingWorkerHarness");
+    harnessField.setAccessible(true);
+    AtomicReference<Object> harnessRef = (AtomicReference<Object>) 
harnessField.get(worker);

Review Comment:
   You can expose a package private @VisibleForTesting method returning the 
StreamingWorkerHarness to StreamingDataflowWorker and use it to get the 
StreamingWorkerHarness. 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java:
##########
@@ -80,7 +80,7 @@ public final class StreamingWorkerStatusPages {
   private final @Nullable GrpcWindmillStreamFactory windmillStreamFactory;
   private final DebugCapture.@Nullable Manager debugCapture;
   private final @Nullable ChannelzServlet channelzServlet;
-  private final @Nullable ChannelCache channelCache;

Review Comment:
   is this intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to