This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aa21e4a84b3 Integrate direct path with StreamingDataflowWorker code 
path (#32778)
aa21e4a84b3 is described below

commit aa21e4a84b3594e9ddd6ddb436c9628cb6660552
Author: martin trieu <[email protected]>
AuthorDate: Tue Nov 26 02:51:57 2024 -0600

    Integrate direct path with StreamingDataflowWorker code path (#32778)
---
 .../options/DataflowStreamingPipelineOptions.java  |  15 +-
 .../dataflow/worker/StreamingDataflowWorker.java   | 594 +++++++++++++--------
 .../FanOutStreamingEngineWorkerHarness.java        |   1 +
 .../harness/SingleSourceWorkerHarness.java         |  13 +-
 .../streaming/harness/StreamingWorkerHarness.java  |   3 +-
 .../harness/StreamingWorkerStatusPages.java        |   2 +-
 .../harness/StreamingWorkerStatusReporter.java     | 131 +++--
 .../windmill/client/throttling/ThrottleTimer.java  |   3 +-
 .../client/throttling/ThrottledTimeTracker.java}   |  14 +-
 .../work/refresh/StreamPoolHeartbeatSender.java    |   4 +-
 .../worker/StreamingDataflowWorkerTest.java        |  42 +-
 .../harness/SingleSourceWorkerHarnessTest.java     |   2 +
 .../harness/StreamingWorkerStatusReporterTest.java |  53 +-
 .../refresh/StreamPoolHeartbeatSenderTest.java     |   6 +-
 14 files changed, 538 insertions(+), 345 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 6a0208f1447..61c38dde2b4 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.options;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.Hidden;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.joda.time.Duration;
@@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceStreamMaxBackoffMillis(int value);
 
-  @Description(
-      "If true, Dataflow streaming pipeline will be running in direct path 
mode."
-          + " VMs must have IPv6 enabled for this to work.")
-  @Default.Boolean(false)
+  @Description("Enables direct path mode for streaming engine.")
+  @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
   boolean getIsWindmillServiceDirectPathEnabled();
 
   void setIsWindmillServiceDirectPathEnabled(boolean 
isWindmillServiceDirectPathEnabled);
@@ -300,4 +299,12 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
       return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE : 
1;
     }
   }
+
+  /** EnableStreamingEngine defaults to false unless one of the experiment is 
set. */
+  class EnableWindmillServiceDirectPathFactory implements 
DefaultValueFactory<Boolean> {
+    @Override
+    public Boolean create(PipelineOptions options) {
+      return ExperimentalOptions.hasExperiment(options, 
"enable_windmill_service_direct_path");
+    }
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6ce60283735..088a28e9b2d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -17,11 +17,11 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
 
-import com.google.api.services.dataflow.model.CounterUpdate;
 import com.google.api.services.dataflow.model.MapTask;
 import com.google.auto.value.AutoValue;
+import java.io.PrintWriter;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -33,24 +33,28 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
 import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
@@ -59,12 +63,15 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorker
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import 
org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
+import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
 import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
@@ -78,8 +85,16 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServ
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker;
@@ -89,6 +104,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkR
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ApplianceHeartbeatSender;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.StreamPoolHeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.fn.JvmInitializers;
@@ -98,18 +114,25 @@ import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.construction.CoderTranslation;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Implements a Streaming Dataflow worker. */
+/**
+ * <b>For internal use only.</b>
+ *
+ * <p>Implements a Streaming Dataflow worker.
+ */
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
+@Internal
 public final class StreamingDataflowWorker {
 
   /**
@@ -142,6 +165,10 @@ public final class StreamingDataflowWorker {
   private static final int DEFAULT_STATUS_PORT = 8081;
   private static final Random CLIENT_ID_GENERATOR = new Random();
   private static final String CHANNELZ_PATH = "/channelz";
+  private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api";
+  private static final String ENABLE_IPV6_EXPERIMENT = 
"enable_private_ipv6_google_access";
+  private static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT =
+      "streaming_engine_use_job_settings_for_heartbeat_pool";
 
   private final WindmillStateCache stateCache;
   private final StreamingWorkerStatusPages statusPages;
@@ -155,9 +182,8 @@ public final class StreamingDataflowWorker {
   private final ReaderCache readerCache;
   private final DataflowExecutionStateSampler sampler = 
DataflowExecutionStateSampler.instance();
   private final ActiveWorkRefresher activeWorkRefresher;
-  private final WorkCommitter workCommitter;
   private final StreamingWorkerStatusReporter workerStatusReporter;
-  private final StreamingCounters streamingCounters;
+  private final int numCommitThreads;
 
   private StreamingDataflowWorker(
       WindmillServerStub windmillServer,
@@ -170,17 +196,17 @@ public final class StreamingDataflowWorker {
       DataflowWorkerHarnessOptions options,
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
-      StreamingWorkerStatusReporter workerStatusReporter,
+      StreamingWorkerStatusReporterFactory 
streamingWorkerStatusReporterFactory,
       FailureTracker failureTracker,
       WorkFailureProcessor workFailureProcessor,
       StreamingCounters streamingCounters,
       MemoryMonitor memoryMonitor,
       GrpcWindmillStreamFactory windmillStreamFactory,
       ScheduledExecutorService activeWorkRefreshExecutorFn,
-      ConcurrentMap<String, StageInfo> stageInfoMap) {
+      ConcurrentMap<String, StageInfo> stageInfoMap,
+      @Nullable GrpcDispatcherClient dispatcherClient) {
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(options);
-
     this.configFetcher = configFetcher;
     this.computationStateCache = computationStateCache;
     this.stateCache = windmillStateCache;
@@ -189,35 +215,13 @@ public final class StreamingDataflowWorker {
             Duration.standardSeconds(options.getReaderCacheTimeoutSec()),
             Executors.newCachedThreadPool());
     this.options = options;
-
-    boolean windmillServiceEnabled = options.isEnableStreamingEngine();
-
-    int numCommitThreads = 1;
-    if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() > 
0) {
-      numCommitThreads = options.getWindmillServiceCommitThreads();
-    }
-
-    this.workCommitter =
-        windmillServiceEnabled
-            ? StreamingEngineWorkCommitter.builder()
-                .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
-                .setCommitWorkStreamFactory(
-                    WindmillStreamPool.create(
-                            numCommitThreads,
-                            COMMIT_STREAM_TIMEOUT,
-                            windmillServer::commitWorkStream)
-                        ::getCloseableStream)
-                .setNumCommitSenders(numCommitThreads)
-                .setOnCommitComplete(this::onCompleteCommit)
-                .build()
-            : StreamingApplianceWorkCommitter.create(
-                windmillServer::commitWork, this::onCompleteCommit);
-
     this.workUnitExecutor = workUnitExecutor;
-
-    this.workerStatusReporter = workerStatusReporter;
-    this.streamingCounters = streamingCounters;
     this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor);
+    this.numCommitThreads =
+        options.isEnableStreamingEngine()
+            ? Math.max(options.getWindmillServiceCommitThreads(), 1)
+            : 1;
+
     StreamingWorkScheduler streamingWorkScheduler =
         StreamingWorkScheduler.create(
             options,
@@ -234,107 +238,200 @@ public final class StreamingDataflowWorker {
             ID_GENERATOR,
             configFetcher.getGlobalConfigHandle(),
             stageInfoMap);
-
     ThrottlingGetDataMetricTracker getDataMetricTracker =
         new ThrottlingGetDataMetricTracker(memoryMonitor);
-    WorkerStatusPages workerStatusPages =
-        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
-    StreamingWorkerStatusPages.Builder statusPagesBuilder = 
StreamingWorkerStatusPages.builder();
-    int stuckCommitDurationMillis;
-    GetDataClient getDataClient;
-    HeartbeatSender heartbeatSender;
-    if (windmillServiceEnabled) {
-      WindmillStreamPool<GetDataStream> getDataStreamPool =
-          WindmillStreamPool.create(
-              Math.max(1, options.getWindmillGetDataStreamCount()),
-              GET_DATA_STREAM_TIMEOUT,
-              windmillServer::getDataStream);
-      getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
+    // Status page members. Different implementations on whether the harness 
is streaming engine
+    // direct path, streaming engine cloud path, or streaming appliance.
+    @Nullable ChannelzServlet channelzServlet = null;
+    Consumer<PrintWriter> getDataStatusProvider;
+    Supplier<Long> currentActiveCommitBytesProvider;
+    if (isDirectPathPipeline(options)) {
+      WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+      FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+          FanOutStreamingEngineWorkerHarness.create(
+              createJobHeader(options, clientId),
+              GetWorkBudget.builder()
+                  .setItems(chooseMaxBundlesOutstanding(options))
+                  .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                  .build(),
+              windmillStreamFactory,
+              (workItem, watermarks, processingContext, 
getWorkStreamLatencies) ->
+                  computationStateCache
+                      .get(processingContext.computationId())
+                      .ifPresent(
+                          computationState -> {
+                            memoryMonitor.waitForResources("GetWork");
+                            streamingWorkScheduler.scheduleWork(
+                                computationState,
+                                workItem,
+                                watermarks,
+                                processingContext,
+                                getWorkStreamLatencies);
+                          }),
+              createFanOutStubFactory(options),
+              GetWorkBudgetDistributors.distributeEvenly(),
+              Preconditions.checkNotNull(dispatcherClient),
+              commitWorkStream ->
+                  StreamingEngineWorkCommitter.builder()
+                      // Share the commitByteSemaphore across all created 
workCommitters.
+                      .setCommitByteSemaphore(maxCommitByteSemaphore)
+                      
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                      .setOnCommitComplete(this::onCompleteCommit)
+                      
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                      .setCommitWorkStreamFactory(
+                          () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                      .build(),
+              getDataMetricTracker);
+      getDataStatusProvider = getDataMetricTracker::printHtml;
+      currentActiveCommitBytesProvider =
+          fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+      channelzServlet =
+          createChannelzServlet(
+              options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+      this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+    } else {
+      // Non-direct path pipelines.
+      Windmill.GetWorkRequest request =
+          Windmill.GetWorkRequest.newBuilder()
+              .setClientId(clientId)
+              .setMaxItems(chooseMaxBundlesOutstanding(options))
+              .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+              .build();
+      GetDataClient getDataClient;
+      HeartbeatSender heartbeatSender;
+      WorkCommitter workCommitter;
+      GetWorkSender getWorkSender;
+      if (options.isEnableStreamingEngine()) {
+        WindmillStreamPool<GetDataStream> getDataStreamPool =
+            WindmillStreamPool.create(
+                Math.max(1, options.getWindmillGetDataStreamCount()),
+                GET_DATA_STREAM_TIMEOUT,
+                windmillServer::getDataStream);
+        getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
         heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
-                    ? separateHeartbeatPool(windmillServer)
-                    : getDataStreamPool);
-
+            createStreamingEngineHeartbeatSender(
+                options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+        channelzServlet =
+            createChannelzServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+        workCommitter =
+            StreamingEngineWorkCommitter.builder()
+                .setCommitWorkStreamFactory(
+                    WindmillStreamPool.create(
+                            numCommitThreads,
+                            COMMIT_STREAM_TIMEOUT,
+                            windmillServer::commitWorkStream)
+                        ::getCloseableStream)
+                .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+                .setNumCommitSenders(numCommitThreads)
+                .setOnCommitComplete(this::onCompleteCommit)
+                .build();
+        getWorkSender =
+            GetWorkSender.forStreamingEngine(
+                receiver -> windmillServer.getWorkStream(request, receiver));
       } else {
-        heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                separateHeartbeatPool(windmillServer),
-                getDataStreamPool,
-                configFetcher.getGlobalConfigHandle());
+        getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
+        heartbeatSender = new 
ApplianceHeartbeatSender(windmillServer::getData);
+        workCommitter =
+            StreamingApplianceWorkCommitter.create(
+                windmillServer::commitWork, this::onCompleteCommit);
+        getWorkSender = GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request));
       }
 
-      stuckCommitDurationMillis =
-          options.getStuckCommitDurationMillis() > 0 ? 
options.getStuckCommitDurationMillis() : 0;
-      statusPagesBuilder
-          .setDebugCapture(
-              new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
-          .setChannelzServlet(
-              new ChannelzServlet(
-                  CHANNELZ_PATH, options, 
windmillServer::getWindmillServiceEndpoints))
-          .setWindmillStreamFactory(windmillStreamFactory);
-    } else {
-      getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
-      heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
-      stuckCommitDurationMillis = 0;
+      getDataStatusProvider = getDataClient::printHtml;
+      currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+
+      this.streamingWorkerHarness =
+          SingleSourceWorkerHarness.builder()
+              .setStreamingWorkScheduler(streamingWorkScheduler)
+              .setWorkCommitter(workCommitter)
+              .setGetDataClient(getDataClient)
+              .setComputationStateFetcher(this.computationStateCache::get)
+              .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
+              .setHeartbeatSender(heartbeatSender)
+              .setThrottledTimeTracker(windmillServer::getAndResetThrottleTime)
+              .setGetWorkSender(getWorkSender)
+              .build();
     }
 
+    this.workerStatusReporter =
+        
streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness);
     this.activeWorkRefresher =
         new ActiveWorkRefresher(
             clock,
             options.getActiveWorkRefreshPeriodMillis(),
-            stuckCommitDurationMillis,
+            options.isEnableStreamingEngine()
+                ? Math.max(options.getStuckCommitDurationMillis(), 0)
+                : 0,
             computationStateCache::getAllPresentComputations,
             sampler,
             activeWorkRefreshExecutorFn,
             getDataMetricTracker::trackHeartbeats);
 
     this.statusPages =
-        statusPagesBuilder
+        createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
             .setClock(clock)
             .setClientId(clientId)
             .setIsRunning(running)
-            .setStatusPages(workerStatusPages)
             .setStateCache(stateCache)
             .setComputationStateCache(this.computationStateCache)
-            
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
-            .setGetDataStatusProvider(getDataClient::printHtml)
             .setWorkUnitExecutor(workUnitExecutor)
             .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
+            .setChannelzServlet(channelzServlet)
+            .setGetDataStatusProvider(getDataStatusProvider)
+            .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
             .build();
 
-    Windmill.GetWorkRequest request =
-        Windmill.GetWorkRequest.newBuilder()
-            .setClientId(clientId)
-            .setMaxItems(chooseMaximumBundlesOutstanding())
-            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
-            .build();
-
-    this.streamingWorkerHarness =
-        SingleSourceWorkerHarness.builder()
-            .setStreamingWorkScheduler(streamingWorkScheduler)
-            .setWorkCommitter(workCommitter)
-            .setGetDataClient(getDataClient)
-            .setComputationStateFetcher(this.computationStateCache::get)
-            .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
-            .setHeartbeatSender(heartbeatSender)
-            .setGetWorkSender(
-                windmillServiceEnabled
-                    ? GetWorkSender.forStreamingEngine(
-                        receiver -> windmillServer.getWorkStream(request, 
receiver))
-                    : GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request)))
-            .build();
-
-    LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
+    LOG.debug("isDirectPathEnabled: {}", 
options.getIsWindmillServiceDirectPathEnabled());
+    LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
-  private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
-      WindmillServerStub windmillServer) {
-    return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillServer::getDataStream);
+  private static StreamingWorkerStatusPages.Builder createStatusPageBuilder(
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      MemoryMonitor memoryMonitor) {
+    WorkerStatusPages workerStatusPages =
+        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+    StreamingWorkerStatusPages.Builder streamingStatusPages =
+        StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages);
+
+    return options.isEnableStreamingEngine()
+        ? streamingStatusPages
+            .setDebugCapture(
+                new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
+            .setWindmillStreamFactory(windmillStreamFactory)
+        : streamingStatusPages;
+  }
+
+  private static ChannelzServlet createChannelzServlet(
+      DataflowWorkerHarnessOptions options,
+      Supplier<ImmutableSet<HostAndPort>> windmillEndpointProvider) {
+    return new ChannelzServlet(CHANNELZ_PATH, options, 
windmillEndpointProvider);
+  }
+
+  private static HeartbeatSender createStreamingEngineHeartbeatSender(
+      DataflowWorkerHarnessOptions options,
+      WindmillServerStub windmillClient,
+      WindmillStreamPool<GetDataStream> getDataStreamPool,
+      StreamingGlobalConfigHandle globalConfigHandle) {
+    // Experiment gates the logic till backend changes are rollback safe
+    if (!DataflowRunner.hasExperiment(
+            options, 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT)
+        || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+      return StreamPoolHeartbeatSender.create(
+          Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
+              ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillClient::getDataStream)
+              : getDataStreamPool);
+
+    } else {
+      return StreamPoolHeartbeatSender.create(
+          WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillClient::getDataStream),
+          getDataStreamPool,
+          globalConfigHandle);
+    }
   }
 
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
@@ -387,17 +484,21 @@ public final class StreamingDataflowWorker {
             failureTracker,
             () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()),
             clock);
-    StreamingWorkerStatusReporter workerStatusReporter =
-        StreamingWorkerStatusReporter.create(
-            dataflowServiceClient,
-            windmillServer::getAndResetThrottleTime,
-            stageInfo::values,
-            failureTracker,
-            streamingCounters,
-            memoryMonitor,
-            workExecutor,
-            options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
-            options.getPerWorkerMetricsUpdateReportingPeriodMillis());
+    StreamingWorkerStatusReporterFactory workerStatusReporterFactory =
+        throttleTimeSupplier ->
+            StreamingWorkerStatusReporter.builder()
+                .setDataflowServiceClient(dataflowServiceClient)
+                .setWindmillQuotaThrottleTime(throttleTimeSupplier)
+                .setAllStageInfo(stageInfo::values)
+                .setFailureTracker(failureTracker)
+                .setStreamingCounters(streamingCounters)
+                .setMemoryMonitor(memoryMonitor)
+                .setWorkExecutor(workExecutor)
+                .setWindmillHarnessUpdateReportingPeriodMillis(
+                    
options.getWindmillHarnessUpdateReportingPeriod().getMillis())
+                .setPerWorkerMetricsUpdateReportingPeriodMillis(
+                    options.getPerWorkerMetricsUpdateReportingPeriodMillis())
+                .build();
 
     return new StreamingDataflowWorker(
         windmillServer,
@@ -410,7 +511,7 @@ public final class StreamingDataflowWorker {
         options,
         new HotKeyLogger(),
         clock,
-        workerStatusReporter,
+        workerStatusReporterFactory,
         failureTracker,
         workFailureProcessor,
         streamingCounters,
@@ -418,7 +519,8 @@ public final class StreamingDataflowWorker {
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         Executors.newSingleThreadScheduledExecutor(
             new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
-        stageInfo);
+        stageInfo,
+        
configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient());
   }
 
   /**
@@ -433,53 +535,121 @@ public final class StreamingDataflowWorker {
           WorkUnitClient dataflowServiceClient,
           GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
           Function<ComputationConfig.Fetcher, ComputationStateCache> 
computationStateCacheFactory) {
-    ComputationConfig.Fetcher configFetcher;
-    WindmillServerStub windmillServer;
-    ComputationStateCache computationStateCache;
-    GrpcWindmillStreamFactory windmillStreamFactory;
     if (options.isEnableStreamingEngine()) {
       GrpcDispatcherClient dispatcherClient =
           GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
-      configFetcher =
+      ComputationConfig.Fetcher configFetcher =
           StreamingEngineComputationConfigFetcher.create(
               options.getGlobalConfigRefreshPeriod().getMillis(), 
dataflowServiceClient);
       
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
-      computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
-      windmillStreamFactory =
+      ComputationStateCache computationStateCache =
+          computationStateCacheFactory.apply(configFetcher);
+      GrpcWindmillStreamFactory windmillStreamFactory =
           windmillStreamFactoryBuilder
               .setProcessHeartbeatResponses(
                   new 
WorkHeartbeatResponseProcessor(computationStateCache::get))
               .setHealthCheckIntervalMillis(
                   options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
               .build();
-      windmillServer = GrpcWindmillServer.create(options, 
windmillStreamFactory, dispatcherClient);
-    } else {
-      if (options.getWindmillServiceEndpoint() != null
-          || options.getLocalWindmillHostport().startsWith("grpc:")) {
-        windmillStreamFactory =
-            windmillStreamFactoryBuilder
-                .setHealthCheckIntervalMillis(
-                    
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
-                .build();
-        windmillServer =
-            GrpcWindmillServer.create(
-                options,
-                windmillStreamFactory,
-                GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options)));
-      } else {
-        windmillStreamFactory = windmillStreamFactoryBuilder.build();
-        windmillServer = new 
JniWindmillApplianceServer(options.getLocalWindmillHostport());
+      return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+          .setWindmillDispatcherClient(dispatcherClient)
+          .setConfigFetcher(configFetcher)
+          .setComputationStateCache(computationStateCache)
+          .setWindmillStreamFactory(windmillStreamFactory)
+          .setWindmillServer(
+              GrpcWindmillServer.create(options, windmillStreamFactory, 
dispatcherClient))
+          .build();
+    }
+
+    // Build with local Windmill client.
+    if (options.getWindmillServiceEndpoint() != null
+        || options.getLocalWindmillHostport().startsWith("grpc:")) {
+      GrpcDispatcherClient dispatcherClient =
+          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
+      GrpcWindmillStreamFactory windmillStreamFactory =
+          windmillStreamFactoryBuilder
+              .setHealthCheckIntervalMillis(
+                  options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+              .build();
+      GrpcWindmillServer windmillServer =
+          GrpcWindmillServer.create(options, windmillStreamFactory, 
dispatcherClient);
+      ComputationConfig.Fetcher configFetcher =
+          createApplianceComputationConfigFetcher(windmillServer);
+      return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+          .setWindmillDispatcherClient(dispatcherClient)
+          .setWindmillServer(windmillServer)
+          .setWindmillStreamFactory(windmillStreamFactory)
+          .setConfigFetcher(configFetcher)
+          
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
+          .build();
+    }
+
+    WindmillServerStub windmillServer =
+        new JniWindmillApplianceServer(options.getLocalWindmillHostport());
+    ComputationConfig.Fetcher configFetcher =
+        createApplianceComputationConfigFetcher(windmillServer);
+    return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+        .setWindmillStreamFactory(windmillStreamFactoryBuilder.build())
+        .setWindmillServer(windmillServer)
+        .setConfigFetcher(configFetcher)
+        
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
+        .build();
+  }
+
+  private static StreamingApplianceComputationConfigFetcher 
createApplianceComputationConfigFetcher(
+      ApplianceWindmillClient windmillClient) {
+    return new StreamingApplianceComputationConfigFetcher(
+        windmillClient::getConfig,
+        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
+  }
+
+  private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions 
options) {
+    if (options.isEnableStreamingEngine() && 
options.getIsWindmillServiceDirectPathEnabled()) {
+      boolean isIpV6Enabled =
+          Optional.ofNullable(options.getDataflowServiceOptions())
+              .map(serviceOptions -> 
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+              .orElse(false);
+
+      if (isIpV6Enabled) {
+        return true;
       }
 
-      configFetcher =
-          new StreamingApplianceComputationConfigFetcher(
-              windmillServer::getConfig,
-              new 
FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
-      computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
+      LOG.warn(
+          "DirectPath is currently only supported with IPv6 networking stack. 
This requires setting "
+              + "\"enable_private_ipv6_google_access\" in experimental 
pipeline options. "
+              + "For information on how to set experimental pipeline options 
see "
+              + 
"https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#experimental.
 "
+              + "Defaulting to CloudPath.");
     }
 
-    return ConfigFetcherComputationStateCacheAndWindmillClient.create(
-        configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
+    return false;
+  }
+
+  private static void validateWorkerOptions(DataflowWorkerHarnessOptions 
options) {
+    Preconditions.checkArgument(
+        options.isStreaming(),
+        "%s instantiated with options indicating batch use",
+        StreamingDataflowWorker.class.getName());
+
+    Preconditions.checkArgument(
+        !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT),
+        "%s cannot be main() class with beam_fn_api enabled",
+        StreamingDataflowWorker.class.getSimpleName());
+  }
+
+  private static ChannelCachingStubFactory createFanOutStubFactory(
+      DataflowWorkerHarnessOptions workerOptions) {
+    return ChannelCachingRemoteStubFactory.create(
+        workerOptions.getGcpCredential(),
+        ChannelCache.create(
+            serviceAddress ->
+                // IsolationChannel will create and manage separate RPC 
channels to the same
+                // serviceAddress.
+                IsolationChannel.create(
+                    () ->
+                        remoteChannel(
+                            serviceAddress,
+                            
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
   }
 
   @VisibleForTesting
@@ -495,7 +665,9 @@ public final class StreamingDataflowWorker {
       Supplier<Instant> clock,
       Function<String, ScheduledExecutorService> executorSupplier,
       StreamingGlobalConfigHandleImpl globalConfigHandle,
-      int localRetryTimeoutMs) {
+      int localRetryTimeoutMs,
+      StreamingCounters streamingCounters,
+      WindmillStubFactoryFactory stubFactory) {
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
     BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
     WindmillStateCache stateCache =
@@ -538,7 +710,6 @@ public final class StreamingDataflowWorker {
                 stateNameMap,
                 stateCache.forComputation(mapTask.getStageName())));
     MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
-    StreamingCounters streamingCounters = StreamingCounters.create();
     FailureTracker failureTracker =
         options.isEnableStreamingEngine()
             ? StreamingEngineFailureTracker.create(
@@ -554,19 +725,23 @@ public final class StreamingDataflowWorker {
             () -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()),
             clock,
             localRetryTimeoutMs);
-    StreamingWorkerStatusReporter workerStatusReporter =
-        StreamingWorkerStatusReporter.forTesting(
-            publishCounters,
-            workUnitClient,
-            windmillServer::getAndResetThrottleTime,
-            stageInfo::values,
-            failureTracker,
-            streamingCounters,
-            memoryMonitor,
-            workExecutor,
-            executorSupplier,
-            options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
-            options.getPerWorkerMetricsUpdateReportingPeriodMillis());
+    StreamingWorkerStatusReporterFactory workerStatusReporterFactory =
+        throttleTimeSupplier ->
+            StreamingWorkerStatusReporter.builder()
+                .setPublishCounters(publishCounters)
+                .setDataflowServiceClient(workUnitClient)
+                .setWindmillQuotaThrottleTime(throttleTimeSupplier)
+                .setAllStageInfo(stageInfo::values)
+                .setFailureTracker(failureTracker)
+                .setStreamingCounters(streamingCounters)
+                .setMemoryMonitor(memoryMonitor)
+                .setWorkExecutor(workExecutor)
+                .setExecutorFactory(executorSupplier)
+                .setWindmillHarnessUpdateReportingPeriodMillis(
+                    
options.getWindmillHarnessUpdateReportingPeriod().getMillis())
+                .setPerWorkerMetricsUpdateReportingPeriodMillis(
+                    options.getPerWorkerMetricsUpdateReportingPeriodMillis())
+                .build();
 
     GrpcWindmillStreamFactory.Builder windmillStreamFactory =
         createGrpcwindmillStreamFactoryBuilder(options, 1)
@@ -584,7 +759,7 @@ public final class StreamingDataflowWorker {
         options,
         hotKeyLogger,
         clock,
-        workerStatusReporter,
+        workerStatusReporterFactory,
         failureTracker,
         workFailureProcessor,
         streamingCounters,
@@ -596,7 +771,8 @@ public final class StreamingDataflowWorker {
                 .build()
             : windmillStreamFactory.build(),
         executorSupplier.apply("RefreshWork"),
-        stageInfo);
+        stageInfo,
+        GrpcDispatcherClient.create(options, stubFactory));
   }
 
   private static GrpcWindmillStreamFactory.Builder 
createGrpcwindmillStreamFactoryBuilder(
@@ -605,13 +781,7 @@ public final class StreamingDataflowWorker {
         !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
             ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
             : 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
-    return GrpcWindmillStreamFactory.of(
-            JobHeader.newBuilder()
-                .setJobId(options.getJobId())
-                .setProjectId(options.getProject())
-                .setWorkerId(options.getWorkerId())
-                .setClientId(clientId)
-                .build())
+    return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId))
         
.setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks())
         .setMaxBackOffSupplier(() -> maxBackoff)
         
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
@@ -622,6 +792,15 @@ public final class StreamingDataflowWorker {
                     options, 
"streaming_engine_disable_new_heartbeat_requests"));
   }
 
+  private static JobHeader createJobHeader(DataflowWorkerHarnessOptions 
options, long clientId) {
+    return JobHeader.newBuilder()
+        .setJobId(options.getJobId())
+        .setProjectId(options.getProject())
+        .setWorkerId(options.getWorkerId())
+        .setClientId(clientId)
+        .build();
+  }
+
   private static BoundedQueueExecutor 
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
     return new BoundedQueueExecutor(
         chooseMaxThreads(options),
@@ -640,15 +819,7 @@ public final class StreamingDataflowWorker {
         DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
             StreamingDataflowWorker.class, DataflowWorkerHarnessOptions.class);
     DataflowWorkerHarnessHelper.configureLogging(options);
-    checkArgument(
-        options.isStreaming(),
-        "%s instantiated with options indicating batch use",
-        StreamingDataflowWorker.class.getName());
-
-    checkArgument(
-        !DataflowRunner.hasExperiment(options, "beam_fn_api"),
-        "%s cannot be main() class with beam_fn_api enabled",
-        StreamingDataflowWorker.class.getSimpleName());
+    validateWorkerOptions(options);
 
     CoderTranslation.verifyModelCodersRegistered();
 
@@ -705,21 +876,6 @@ public final class StreamingDataflowWorker {
     workerStatusReporter.reportPeriodicWorkerUpdates();
   }
 
-  private int chooseMaximumNumberOfThreads() {
-    if (options.getNumberOfWorkerHarnessThreads() != 0) {
-      return options.getNumberOfWorkerHarnessThreads();
-    }
-    return MAX_PROCESSING_THREADS;
-  }
-
-  private int chooseMaximumBundlesOutstanding() {
-    int maxBundles = options.getMaxBundlesFromWindmillOutstanding();
-    if (maxBundles > 0) {
-      return maxBundles;
-    }
-    return chooseMaximumNumberOfThreads() + 100;
-  }
-
   @VisibleForTesting
   public boolean workExecutorIsEmpty() {
     return workUnitExecutor.executorQueueIsEmpty();
@@ -727,7 +883,7 @@ public final class StreamingDataflowWorker {
 
   @VisibleForTesting
   int numCommitThreads() {
-    return workCommitter.parallelism();
+    return numCommitThreads;
   }
 
   @VisibleForTesting
@@ -740,7 +896,6 @@ public final class StreamingDataflowWorker {
     return computationStateCache;
   }
 
-  @SuppressWarnings("FutureReturnValueIgnored")
   public void start() {
     running.set(true);
     configFetcher.start();
@@ -791,27 +946,17 @@ public final class StreamingDataflowWorker {
                     completeCommit.shardedKey(), completeCommit.workId()));
   }
 
-  @VisibleForTesting
-  public Iterable<CounterUpdate> buildCounters() {
-    return Iterables.concat(
-        streamingCounters
-            .pendingDeltaCounters()
-            
.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE),
-        streamingCounters
-            .pendingCumulativeCounters()
-            .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
+  @FunctionalInterface
+  private interface StreamingWorkerStatusReporterFactory {
+    StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker 
throttledTimeTracker);
   }
 
   @AutoValue
   abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {
 
-    private static ConfigFetcherComputationStateCacheAndWindmillClient create(
-        ComputationConfig.Fetcher configFetcher,
-        ComputationStateCache computationStateCache,
-        WindmillServerStub windmillServer,
-        GrpcWindmillStreamFactory windmillStreamFactory) {
-      return new 
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
-          configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
+    private static Builder builder() {
+      return new 
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient
+          .Builder();
     }
 
     abstract ComputationConfig.Fetcher configFetcher();
@@ -821,6 +966,23 @@ public final class StreamingDataflowWorker {
     abstract WindmillServerStub windmillServer();
 
     abstract GrpcWindmillStreamFactory windmillStreamFactory();
+
+    abstract @Nullable GrpcDispatcherClient windmillDispatcherClient();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setConfigFetcher(ComputationConfig.Fetcher value);
+
+      abstract Builder setComputationStateCache(ComputationStateCache value);
+
+      abstract Builder setWindmillServer(WindmillServerStub value);
+
+      abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory 
value);
+
+      abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value);
+
+      abstract ConfigFetcherComputationStateCacheAndWindmillClient build();
+    }
   }
 
   /**
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
index 1ef1691b081..4c73e8c7f61 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
@@ -371,6 +371,7 @@ public final class FanOutStreamingEngineWorkerHarness 
implements StreamingWorker
   }
 
   /** Add up all the throttle times of all streams including 
GetWorkerMetadataStream. */
+  @Override
   public long getAndResetThrottleTime() {
     return backends.get().windmillStreams().values().stream()
             .map(WindmillStreamSender::getAndResetThrottleTime)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index 9716b834cac..65203288e16 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.Windm
 import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
@@ -66,6 +67,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
   private final Function<String, Optional<ComputationState>> 
computationStateFetcher;
   private final ExecutorService workProviderExecutor;
   private final GetWorkSender getWorkSender;
+  private final ThrottledTimeTracker throttledTimeTracker;
 
   SingleSourceWorkerHarness(
       WorkCommitter workCommitter,
@@ -74,7 +76,8 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
       StreamingWorkScheduler streamingWorkScheduler,
       Runnable waitForResources,
       Function<String, Optional<ComputationState>> computationStateFetcher,
-      GetWorkSender getWorkSender) {
+      GetWorkSender getWorkSender,
+      ThrottledTimeTracker throttledTimeTracker) {
     this.workCommitter = workCommitter;
     this.getDataClient = getDataClient;
     this.heartbeatSender = heartbeatSender;
@@ -90,6 +93,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
                 .build());
     this.isRunning = new AtomicBoolean(false);
     this.getWorkSender = getWorkSender;
+    this.throttledTimeTracker = throttledTimeTracker;
   }
 
   public static SingleSourceWorkerHarness.Builder builder() {
@@ -144,6 +148,11 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
     workCommitter.stop();
   }
 
+  @Override
+  public long getAndResetThrottleTime() {
+    return throttledTimeTracker.getAndResetThrottleTime();
+  }
+
   private void streamingEngineDispatchLoop(
       Function<WorkItemReceiver, WindmillStream.GetWorkStream> 
getWorkStreamFactory) {
     while (isRunning.get()) {
@@ -254,6 +263,8 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
 
     Builder setGetWorkSender(GetWorkSender getWorkSender);
 
+    Builder setThrottledTimeTracker(ThrottledTimeTracker throttledTimeTracker);
+
     SingleSourceWorkerHarness build();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
index c1b4570e226..731a5a4b1b5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.worker.streaming.harness;
 
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
 import org.apache.beam.sdk.annotations.Internal;
 
 /** Provides an interface to start streaming worker processing. */
 @Internal
-public interface StreamingWorkerHarness {
+public interface StreamingWorkerHarness extends ThrottledTimeTracker {
   void start();
 
   void shutdown();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
index 6981312eff1..ddfc6809231 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
@@ -258,7 +258,7 @@ public final class StreamingWorkerStatusPages {
 
     Builder setDebugCapture(DebugCapture.Manager debugCapture);
 
-    Builder setChannelzServlet(ChannelzServlet channelzServlet);
+    Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet);
 
     Builder setStateCache(WindmillStateCache stateCache);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
index ba77d8e1ce2..3557f0d193c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
@@ -27,6 +27,7 @@ import 
com.google.api.services.dataflow.model.StreamingScalingReportResponse;
 import com.google.api.services.dataflow.model.WorkItemStatus;
 import com.google.api.services.dataflow.model.WorkerMessage;
 import com.google.api.services.dataflow.model.WorkerMessageResponse;
+import com.google.auto.value.AutoBuilder;
 import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.ArrayList;
@@ -51,6 +52,7 @@ import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
 import org.apache.beam.sdk.annotations.Internal;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -78,7 +80,7 @@ public final class StreamingWorkerStatusReporter {
   private final int initialMaxThreadCount;
   private final int initialMaxBundlesOutstanding;
   private final WorkUnitClient dataflowServiceClient;
-  private final Supplier<Long> windmillQuotaThrottleTime;
+  private final ThrottledTimeTracker windmillQuotaThrottleTime;
   private final Supplier<Collection<StageInfo>> allStageInfo;
   private final FailureTracker failureTracker;
   private final StreamingCounters streamingCounters;
@@ -97,10 +99,10 @@ public final class StreamingWorkerStatusReporter {
   // Used to track the number of WorkerMessages that have been sent without 
PerWorkerMetrics.
   private final AtomicLong workerMessagesIndex;
 
-  private StreamingWorkerStatusReporter(
+  StreamingWorkerStatusReporter(
       boolean publishCounters,
       WorkUnitClient dataflowServiceClient,
-      Supplier<Long> windmillQuotaThrottleTime,
+      ThrottledTimeTracker windmillQuotaThrottleTime,
       Supplier<Collection<StageInfo>> allStageInfo,
       FailureTracker failureTracker,
       StreamingCounters streamingCounters,
@@ -131,57 +133,13 @@ public final class StreamingWorkerStatusReporter {
     this.workerMessagesIndex = new AtomicLong();
   }
 
-  public static StreamingWorkerStatusReporter create(
-      WorkUnitClient workUnitClient,
-      Supplier<Long> windmillQuotaThrottleTime,
-      Supplier<Collection<StageInfo>> allStageInfo,
-      FailureTracker failureTracker,
-      StreamingCounters streamingCounters,
-      MemoryMonitor memoryMonitor,
-      BoundedQueueExecutor workExecutor,
-      long windmillHarnessUpdateReportingPeriodMillis,
-      long perWorkerMetricsUpdateReportingPeriodMillis) {
-    return new StreamingWorkerStatusReporter(
-        /* publishCounters= */ true,
-        workUnitClient,
-        windmillQuotaThrottleTime,
-        allStageInfo,
-        failureTracker,
-        streamingCounters,
-        memoryMonitor,
-        workExecutor,
-        threadName ->
-            Executors.newSingleThreadScheduledExecutor(
-                new ThreadFactoryBuilder().setNameFormat(threadName).build()),
-        windmillHarnessUpdateReportingPeriodMillis,
-        perWorkerMetricsUpdateReportingPeriodMillis);
-  }
-
-  @VisibleForTesting
-  public static StreamingWorkerStatusReporter forTesting(
-      boolean publishCounters,
-      WorkUnitClient workUnitClient,
-      Supplier<Long> windmillQuotaThrottleTime,
-      Supplier<Collection<StageInfo>> allStageInfo,
-      FailureTracker failureTracker,
-      StreamingCounters streamingCounters,
-      MemoryMonitor memoryMonitor,
-      BoundedQueueExecutor workExecutor,
-      Function<String, ScheduledExecutorService> executorFactory,
-      long windmillHarnessUpdateReportingPeriodMillis,
-      long perWorkerMetricsUpdateReportingPeriodMillis) {
-    return new StreamingWorkerStatusReporter(
-        publishCounters,
-        workUnitClient,
-        windmillQuotaThrottleTime,
-        allStageInfo,
-        failureTracker,
-        streamingCounters,
-        memoryMonitor,
-        workExecutor,
-        executorFactory,
-        windmillHarnessUpdateReportingPeriodMillis,
-        perWorkerMetricsUpdateReportingPeriodMillis);
+  public static Builder builder() {
+    return new AutoBuilder_StreamingWorkerStatusReporter_Builder()
+        .setPublishCounters(true)
+        .setExecutorFactory(
+            threadName ->
+                Executors.newSingleThreadScheduledExecutor(
+                    new 
ThreadFactoryBuilder().setNameFormat(threadName).build()));
   }
 
   /**
@@ -228,6 +186,22 @@ public final class StreamingWorkerStatusReporter {
     }
   }
 
+  // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment 
with the
+  // WorkerMessages RPC schedule. The desired reporting period
+  // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest 
multiple
+  // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
+  private static long getPerWorkerMetricsUpdateFrequency(
+      long windmillHarnessUpdateReportingPeriodMillis,
+      long perWorkerMetricsUpdateReportingPeriodMillis) {
+    if (windmillHarnessUpdateReportingPeriodMillis == 0) {
+      return 0;
+    }
+    return LongMath.divide(
+        perWorkerMetricsUpdateReportingPeriodMillis,
+        windmillHarnessUpdateReportingPeriodMillis,
+        RoundingMode.CEILING);
+  }
+
   @SuppressWarnings("FutureReturnValueIgnored")
   public void start() {
     reportHarnessStartup();
@@ -276,27 +250,13 @@ public final class StreamingWorkerStatusReporter {
     }
   }
 
-  // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment 
with the
-  // WorkerMessages RPC schedule. The desired reporting period
-  // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest 
multiple
-  // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
-  private static long getPerWorkerMetricsUpdateFrequency(
-      long windmillHarnessUpdateReportingPeriodMillis,
-      long perWorkerMetricsUpdateReportingPeriodMillis) {
-    if (windmillHarnessUpdateReportingPeriodMillis == 0) {
-      return 0;
-    }
-    return LongMath.divide(
-        perWorkerMetricsUpdateReportingPeriodMillis,
-        windmillHarnessUpdateReportingPeriodMillis,
-        RoundingMode.CEILING);
-  }
-
   /** Sends counter updates to Dataflow backend. */
   private void sendWorkerUpdatesToDataflowService(
       CounterSet deltaCounters, CounterSet cumulativeCounters) throws 
IOException {
     // Throttle time is tracked by the windmillServer but is reported to DFE 
here.
-    
streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get());
+    streamingCounters
+        .windmillQuotaThrottling()
+        .addValue(windmillQuotaThrottleTime.getAndResetThrottleTime());
     if (memoryMonitor.isThrashing()) {
       streamingCounters.memoryThrashing().addValue(1);
     }
@@ -496,4 +456,33 @@ public final class StreamingWorkerStatusReporter {
         .maxOutstandingBundles()
         .addValue((long) workExecutor.maximumElementsOutstanding());
   }
+
+  @AutoBuilder
+  public interface Builder {
+    Builder setPublishCounters(boolean publishCounters);
+
+    Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);
+
+    Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker 
windmillQuotaThrottledTimeTracker);
+
+    Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);
+
+    Builder setFailureTracker(FailureTracker failureTracker);
+
+    Builder setStreamingCounters(StreamingCounters streamingCounters);
+
+    Builder setMemoryMonitor(MemoryMonitor memoryMonitor);
+
+    Builder setWorkExecutor(BoundedQueueExecutor workExecutor);
+
+    Builder setExecutorFactory(Function<String, ScheduledExecutorService> 
executorFactory);
+
+    Builder setWindmillHarnessUpdateReportingPeriodMillis(
+        long windmillHarnessUpdateReportingPeriodMillis);
+
+    Builder setPerWorkerMetricsUpdateReportingPeriodMillis(
+        long perWorkerMetricsUpdateReportingPeriodMillis);
+
+    StreamingWorkerStatusReporter build();
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
index f660112721b..fdcb0339d23 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
@@ -25,7 +25,7 @@ import org.joda.time.Instant;
  * CommitWork are both blocked for x, totalTime will be 2x. However, if 2 
GetWork streams are both
  * blocked for x totalTime will be x. All methods are thread safe.
  */
-public final class ThrottleTimer {
+public final class ThrottleTimer implements ThrottledTimeTracker {
   // This is -1 if not currently being throttled or the time in
   // milliseconds when throttling for this type started.
   private long startTime = -1;
@@ -56,6 +56,7 @@ public final class ThrottleTimer {
   }
 
   /** Returns the combined total of all throttle times and resets those times 
to 0. */
+  @Override
   public synchronized long getAndResetThrottleTime() {
     if (throttled()) {
       stop();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
similarity index 69%
copy from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
copy to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
index c1b4570e226..9bb8fb0a7b5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
@@ -15,14 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.dataflow.worker.streaming.harness;
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
 
 import org.apache.beam.sdk.annotations.Internal;
 
-/** Provides an interface to start streaming worker processing. */
+/**
+ * Tracks time spent in a throttled state due to {@code 
Status.RESOURCE_EXHAUSTED} errors returned
+ * from gRPC calls.
+ */
 @Internal
-public interface StreamingWorkerHarness {
-  void start();
+@FunctionalInterface
+public interface ThrottledTimeTracker {
 
-  void shutdown();
+  /** Returns the combined total of all throttle times and resets those times 
to 0. */
+  long getAndResetThrottleTime();
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
index fa36b11ffe5..f54091dc2b9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
@@ -42,7 +42,7 @@ public final class StreamPoolHeartbeatSender implements 
HeartbeatSender {
     this.heartbeatStreamPool.set(heartbeatStreamPool);
   }
 
-  public static StreamPoolHeartbeatSender Create(
+  public static StreamPoolHeartbeatSender create(
       @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool) {
     return new StreamPoolHeartbeatSender(heartbeatStreamPool);
   }
@@ -55,7 +55,7 @@ public final class StreamPoolHeartbeatSender implements 
HeartbeatSender {
    *     enabled.
    * @param getDataPool stream to use when using separate streams for 
heartbeat is disabled.
    */
-  public static StreamPoolHeartbeatSender Create(
+  public static StreamPoolHeartbeatSender create(
       @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
dedicatedHeartbeatPool,
       @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
       @Nonnull StreamingGlobalConfigHandle configHandle) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index dadf0217123..6eeb7bd6bbf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.util.Structs;
+import 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
 import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
@@ -104,6 +105,7 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
+import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
 import 
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
 import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -129,6 +131,9 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
@@ -178,6 +183,7 @@ import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -285,6 +291,7 @@ public class StreamingDataflowWorkerTest {
   private final FakeWindmillServer server =
       new FakeWindmillServer(
           errorCollector, computationId -> 
computationStateCache.get(computationId));
+  private StreamingCounters streamingCounters;
 
   public StreamingDataflowWorkerTest(Boolean streamingEngine) {
     this.streamingEngine = streamingEngine;
@@ -304,9 +311,20 @@ public class StreamingDataflowWorkerTest {
     return null;
   }
 
+  private Iterable<CounterUpdate> buildCounters() {
+    return Iterables.concat(
+        streamingCounters
+            .pendingDeltaCounters()
+            
.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE),
+        streamingCounters
+            .pendingCumulativeCounters()
+            .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
+  }
+
   @Before
   public void setUp() {
     server.clearCommitsReceived();
+    streamingCounters = StreamingCounters.create();
   }
 
   @After
@@ -856,7 +874,13 @@ public class StreamingDataflowWorkerTest {
             streamingDataflowWorkerTestParams.clock(),
             streamingDataflowWorkerTestParams.executorSupplier(),
             mockGlobalConfigHandle,
-            streamingDataflowWorkerTestParams.localRetryTimeoutMs());
+            streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
+            streamingCounters,
+            new FakeWindmillStubFactoryFactory(
+                new FakeWindmillStubFactory(
+                    () ->
+                        WindmillChannelFactory.inProcessChannel(
+                            "StreamingDataflowWorkerTestChannel"))));
     this.computationStateCache = worker.getComputationStateCache();
     return worker;
   }
@@ -1715,7 +1739,7 @@ public class StreamingDataflowWorkerTest {
                 intervalWindowBytes(WINDOW_AT_ZERO)));
 
     Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
-    Iterable<CounterUpdate> counters = worker.buildCounters();
+    Iterable<CounterUpdate> counters = buildCounters();
 
     // These tags and data are opaque strings and this is a change detector 
test.
     // The "/u" indicates the user's namespace, versus "/s" for system 
namespace
@@ -1836,7 +1860,7 @@ public class StreamingDataflowWorkerTest {
     expectedBytesRead += dataBuilder.build().getSerializedSize();
 
     result = server.waitForAndGetCommits(1);
-    counters = worker.buildCounters();
+    counters = buildCounters();
     actualOutput = result.get(2L);
 
     assertEquals(1, actualOutput.getOutputMessagesCount());
@@ -2004,7 +2028,7 @@ public class StreamingDataflowWorkerTest {
                 intervalWindowBytes(WINDOW_AT_ZERO)));
 
     Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
-    Iterable<CounterUpdate> counters = worker.buildCounters();
+    Iterable<CounterUpdate> counters = buildCounters();
 
     // These tags and data are opaque strings and this is a change detector 
test.
     // The "/u" indicates the user's namespace, versus "/s" for system 
namespace
@@ -2125,7 +2149,7 @@ public class StreamingDataflowWorkerTest {
     expectedBytesRead += dataBuilder.build().getSerializedSize();
 
     result = server.waitForAndGetCommits(1);
-    counters = worker.buildCounters();
+    counters = buildCounters();
     actualOutput = result.get(2L);
 
     assertEquals(1, actualOutput.getOutputMessagesCount());
@@ -2430,7 +2454,7 @@ public class StreamingDataflowWorkerTest {
                 null));
 
     Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
-    Iterable<CounterUpdate> counters = worker.buildCounters();
+    Iterable<CounterUpdate> counters = buildCounters();
     Windmill.WorkItemCommitRequest commit = result.get(1L);
     UnsignedLong finalizeId =
         
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2492,7 +2516,7 @@ public class StreamingDataflowWorkerTest {
                 null));
 
     result = server.waitForAndGetCommits(1);
-    counters = worker.buildCounters();
+    counters = buildCounters();
 
     commit = result.get(2L);
     finalizeId = 
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2540,7 +2564,7 @@ public class StreamingDataflowWorkerTest {
                 null));
 
     result = server.waitForAndGetCommits(1);
-    counters = worker.buildCounters();
+    counters = buildCounters();
 
     commit = result.get(3L);
     finalizeId = 
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2710,7 +2734,7 @@ public class StreamingDataflowWorkerTest {
     server.whenGetWorkCalled().thenReturn(work);
 
     Map<Long, Windmill.WorkItemCommitRequest> result = 
server.waitForAndGetCommits(1);
-    Iterable<CounterUpdate> counters = worker.buildCounters();
+    Iterable<CounterUpdate> counters = buildCounters();
     Windmill.WorkItemCommitRequest commit = result.get(1L);
     UnsignedLong finalizeId =
         
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
index 5a2df4baae6..4df3bf7cd82 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
@@ -60,6 +60,8 @@ public class SingleSourceWorkerHarnessTest {
         .setWaitForResources(waitForResources)
         .setStreamingWorkScheduler(streamingWorkScheduler)
         .setComputationStateFetcher(computationStateFetcher)
+        // no-op throttle time supplier.
+        .setThrottledTimeTracker(() -> 0L)
         .setGetWorkSender(getWorkSender)
         .build();
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
index 7e65a495638..f348e4cf1bd 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
@@ -39,14 +39,15 @@ import org.mockito.Mockito;
 
 @RunWith(JUnit4.class)
 public class StreamingWorkerStatusReporterTest {
-  private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
-  private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
-  private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;
+  private static final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
+  private static final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
+  private static final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;
 
   private BoundedQueueExecutor mockExecutor;
   private WorkUnitClient mockWorkUnitClient;
   private FailureTracker mockFailureTracker;
   private MemoryMonitor mockMemoryMonitor;
+  private StreamingWorkerStatusReporter reporter;
 
   @Before
   public void setUp() {
@@ -54,23 +55,11 @@ public class StreamingWorkerStatusReporterTest {
     this.mockWorkUnitClient = mock(WorkUnitClient.class);
     this.mockFailureTracker = mock(FailureTracker.class);
     this.mockMemoryMonitor = mock(MemoryMonitor.class);
+    this.reporter = buildWorkerStatusReporterForTest();
   }
 
   @Test
   public void testOverrideMaximumThreadCount() throws Exception {
-    StreamingWorkerStatusReporter reporter =
-        StreamingWorkerStatusReporter.forTesting(
-            true,
-            mockWorkUnitClient,
-            () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
-            () -> Collections.emptyList(),
-            mockFailureTracker,
-            StreamingCounters.create(),
-            mockMemoryMonitor,
-            mockExecutor,
-            (threadName) -> Executors.newSingleThreadScheduledExecutor(),
-            DEFAULT_HARNESS_REPORTING_PERIOD,
-            DEFAULT_PER_WORKER_METRICS_PERIOD);
     StreamingScalingReportResponse streamingScalingReportResponse =
         new StreamingScalingReportResponse().setMaximumThreadCount(10);
     WorkerMessageResponse workerMessageResponse =
@@ -84,23 +73,25 @@ public class StreamingWorkerStatusReporterTest {
 
   @Test
   public void testHandleEmptyWorkerMessageResponse() throws Exception {
-    StreamingWorkerStatusReporter reporter =
-        StreamingWorkerStatusReporter.forTesting(
-            true,
-            mockWorkUnitClient,
-            () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
-            () -> Collections.emptyList(),
-            mockFailureTracker,
-            StreamingCounters.create(),
-            mockMemoryMonitor,
-            mockExecutor,
-            (threadName) -> Executors.newSingleThreadScheduledExecutor(),
-            DEFAULT_HARNESS_REPORTING_PERIOD,
-            DEFAULT_PER_WORKER_METRICS_PERIOD);
-    WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse();
     when(mockWorkUnitClient.reportWorkerMessage(any()))
-        .thenReturn(Collections.singletonList(workerMessageResponse));
+        .thenReturn(Collections.singletonList(new WorkerMessageResponse()));
     reporter.reportPeriodicWorkerMessage();
     verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(), 
anyInt());
   }
+
+  private StreamingWorkerStatusReporter buildWorkerStatusReporterForTest() {
+    return StreamingWorkerStatusReporter.builder()
+        .setPublishCounters(true)
+        .setDataflowServiceClient(mockWorkUnitClient)
+        .setWindmillQuotaThrottleTime(() -> 
DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME)
+        .setAllStageInfo(Collections::emptyList)
+        .setFailureTracker(mockFailureTracker)
+        .setStreamingCounters(StreamingCounters.create())
+        .setMemoryMonitor(mockMemoryMonitor)
+        .setWorkExecutor(mockExecutor)
+        .setExecutorFactory((threadName) -> 
Executors.newSingleThreadScheduledExecutor())
+        
.setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD)
+        
.setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD)
+        .build();
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
index ed915088d0a..acbb3aebbcf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
@@ -39,7 +39,7 @@ public class StreamPoolHeartbeatSenderTest {
   public void sendsHeartbeatsOnStream() {
     FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c 
-> Optional.empty());
     StreamPoolHeartbeatSender heartbeatSender =
-        StreamPoolHeartbeatSender.Create(
+        StreamPoolHeartbeatSender.create(
             WindmillStreamPool.create(1, Duration.standardSeconds(10), 
server::getDataStream));
     Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
     heartbeatsBuilder
@@ -59,7 +59,7 @@ public class StreamPoolHeartbeatSenderTest {
     FakeGlobalConfigHandle configHandle =
         new 
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
     StreamPoolHeartbeatSender heartbeatSender =
-        StreamPoolHeartbeatSender.Create(
+        StreamPoolHeartbeatSender.create(
             WindmillStreamPool.create(
                 1, Duration.standardSeconds(10), 
dedicatedServer::getDataStream),
             WindmillStreamPool.create(
@@ -104,7 +104,7 @@ public class StreamPoolHeartbeatSenderTest {
     FakeGlobalConfigHandle configHandle =
         new 
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
     StreamPoolHeartbeatSender heartbeatSender =
-        StreamPoolHeartbeatSender.Create(
+        StreamPoolHeartbeatSender.create(
             WindmillStreamPool.create(
                 1, Duration.standardSeconds(10), 
dedicatedServer::getDataStream),
             WindmillStreamPool.create(

Reply via email to