scwhittle commented on code in PR #32778:
URL: https://github.com/apache/beam/pull/32778#discussion_r1801053010


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -118,6 +130,8 @@ public final class StreamingDataflowWorker {
    */
   public static final int MAX_SINK_BYTES = 10_000_000;
 
+  public static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =

Review Comment:
   move back where it was and make private?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -234,107 +230,199 @@ private StreamingDataflowWorker(
             ID_GENERATOR,
             configFetcher.getGlobalConfigHandle(),
             stageInfoMap);
-
     ThrottlingGetDataMetricTracker getDataMetricTracker =
         new ThrottlingGetDataMetricTracker(memoryMonitor);
-    WorkerStatusPages workerStatusPages =
-        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
-    StreamingWorkerStatusPages.Builder statusPagesBuilder = 
StreamingWorkerStatusPages.builder();
-    int stuckCommitDurationMillis;
-    GetDataClient getDataClient;
-    HeartbeatSender heartbeatSender;
-    if (windmillServiceEnabled) {
-      WindmillStreamPool<GetDataStream> getDataStreamPool =
-          WindmillStreamPool.create(
-              Math.max(1, options.getWindmillGetDataStreamCount()),
-              GET_DATA_STREAM_TIMEOUT,
-              windmillServer::getDataStream);
-      getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
+    // Status page members. Different implementations on whether the harness 
is streaming engine
+    // direct path, streaming engine cloud path, or streaming appliance.
+    @Nullable ChannelzServlet channelzServlet = null;
+    Consumer<PrintWriter> getDataStatusProvider;
+    Supplier<Long> currentActiveCommitBytesProvider;
+    if (isDirectPathPipeline(options)) {
+      WeightedSemaphore<Commit> maxCommitByteSemaphore = 
Commits.maxCommitByteSemaphore();
+      FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+          FanOutStreamingEngineWorkerHarness.create(
+              createJobHeader(options, clientId),
+              GetWorkBudget.builder()
+                  .setItems(chooseMaxBundlesOutstanding(options))
+                  .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                  .build(),
+              windmillStreamFactory,
+              (workItem, watermarks, processingContext, 
getWorkStreamLatencies) ->
+                  computationStateCache
+                      .get(processingContext.computationId())
+                      .ifPresent(
+                          computationState -> {
+                            memoryMonitor.waitForResources("GetWork");
+                            streamingWorkScheduler.scheduleWork(
+                                computationState,
+                                workItem,
+                                watermarks,
+                                processingContext,
+                                getWorkStreamLatencies);
+                          }),
+              createStubFactory(options),
+              GetWorkBudgetDistributors.distributeEvenly(),
+              Preconditions.checkNotNull(dispatcherClient),
+              commitWorkStream ->
+                  StreamingEngineWorkCommitter.builder()
+                      // Share the commitByteSemaphore across all created 
workCommitters.
+                      .setCommitByteSemaphore(maxCommitByteSemaphore)
+                      
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                      .setOnCommitComplete(this::onCompleteCommit)
+                      
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                      .setCommitWorkStreamFactory(
+                          () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                      .build(),
+              getDataMetricTracker);
+      getDataStatusProvider = getDataMetricTracker::printHtml;
+      currentActiveCommitBytesProvider =
+          fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+      channelzServlet =
+          createChannelZServlet(
+              options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+      this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+    } else {
+      Windmill.GetWorkRequest request =

Review Comment:
   // Not a direct-path pipeline.
   
   since if is big



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -233,110 +225,192 @@ private StreamingDataflowWorker(
             ID_GENERATOR,
             configFetcher.getGlobalConfigHandle(),
             stageInfoMap);
-
     ThrottlingGetDataMetricTracker getDataMetricTracker =
         new ThrottlingGetDataMetricTracker(memoryMonitor);
-    WorkerStatusPages workerStatusPages =
-        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
-    StreamingWorkerStatusPages.Builder statusPagesBuilder = 
StreamingWorkerStatusPages.builder();
-    int stuckCommitDurationMillis;
-    GetDataClient getDataClient;
-    HeartbeatSender heartbeatSender;
-    if (windmillServiceEnabled) {
-      WindmillStreamPool<GetDataStream> getDataStreamPool =
-          WindmillStreamPool.create(
-              Math.max(1, options.getWindmillGetDataStreamCount()),
-              GET_DATA_STREAM_TIMEOUT,
-              windmillServer::getDataStream);
-      getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      // Experiment gates the logic till backend changes are rollback safe
-      if (!DataflowRunner.hasExperiment(
-              options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
-          || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+    // Status page members. Different implementations on whether the harness 
is streaming engine
+    // direct path, streaming engine cloud path, or streaming appliance.
+    @Nullable ChannelzServlet channelzServlet = null;
+    Consumer<PrintWriter> getDataStatusProvider;
+    Supplier<Long> currentActiveCommitBytesProvider;
+    if (isDirectPathPipeline(options)) {
+      FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+          FanOutStreamingEngineWorkerHarness.create(
+              createJobHeader(options, clientId),
+              GetWorkBudget.builder()
+                  .setItems(chooseMaxBundlesOutstanding(options))
+                  .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                  .build(),
+              windmillStreamFactory,
+              (workItem, watermarks, processingContext, 
getWorkStreamLatencies) ->
+                  computationStateCache
+                      .get(processingContext.computationId())
+                      .ifPresent(
+                          computationState -> {
+                            memoryMonitor.waitForResources("GetWork");
+                            streamingWorkScheduler.scheduleWork(
+                                computationState,
+                                workItem,
+                                watermarks,
+                                processingContext,
+                                getWorkStreamLatencies);
+                          }),
+              createStubFactory(options),
+              // TODO (m-trieu) replace with 
GetWorkBudgetDistributors.distributeEvenly() once
+              // https://github.com/apache/beam/pull/32775 is merged.
+              
GetWorkBudgetDistributors.distributeEvenly(GetWorkBudget::noBudget),
+              Preconditions.checkNotNull(dispatcherClient),
+              commitWorkStream ->
+                  StreamingEngineWorkCommitter.builder()
+                      
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                      .setOnCommitComplete(this::onCompleteCommit)
+                      
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                      .setCommitWorkStreamFactory(
+                          () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                      .build(),
+              getDataMetricTracker);
+      getDataStatusProvider = getDataMetricTracker::printHtml;
+      currentActiveCommitBytesProvider =
+          fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+      channelzServlet =
+          createChannelZServlet(
+              options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+      this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+    } else {
+      Windmill.GetWorkRequest request = createGetWorkRequest(clientId, 
options);
+      GetDataClient getDataClient;
+      HeartbeatSender heartbeatSender;
+      WorkCommitter workCommitter;
+      GetWorkSender getWorkSender;
+      if (options.isEnableStreamingEngine()) {
+        WindmillStreamPool<GetDataStream> getDataStreamPool =
+            WindmillStreamPool.create(
+                Math.max(1, options.getWindmillGetDataStreamCount()),
+                GET_DATA_STREAM_TIMEOUT,
+                windmillServer::getDataStream);
+        getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
         heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
-                    ? separateHeartbeatPool(windmillServer)
-                    : getDataStreamPool);
-
+            createStreamingEngineHeartbeatSender(
+                options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+        channelzServlet =
+            createChannelZServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+        workCommitter =
+            StreamingEngineWorkCommitter.builder()
+                .setCommitWorkStreamFactory(
+                    WindmillStreamPool.create(
+                            numCommitThreads,
+                            COMMIT_STREAM_TIMEOUT,
+                            windmillServer::commitWorkStream)
+                        ::getCloseableStream)
+                .setNumCommitSenders(numCommitThreads)
+                .setOnCommitComplete(this::onCompleteCommit)
+                .build();
+        getWorkSender =
+            GetWorkSender.forStreamingEngine(
+                receiver -> windmillServer.getWorkStream(request, receiver));
       } else {
-        heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                separateHeartbeatPool(windmillServer),
-                getDataStreamPool,
-                configFetcher.getGlobalConfigHandle());
+        getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
+        heartbeatSender = new 
ApplianceHeartbeatSender(windmillServer::getData);
+        workCommitter =
+            StreamingApplianceWorkCommitter.create(
+                windmillServer::commitWork, this::onCompleteCommit);
+        getWorkSender = GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request));
       }
 
-      stuckCommitDurationMillis =
-          options.getStuckCommitDurationMillis() > 0 ? 
options.getStuckCommitDurationMillis() : 0;
-      statusPagesBuilder
-          .setDebugCapture(
-              new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
-          .setChannelzServlet(
-              new ChannelzServlet(
-                  CHANNELZ_PATH, options, 
windmillServer::getWindmillServiceEndpoints))
-          .setWindmillStreamFactory(windmillStreamFactory);
-    } else {
-      getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
-      heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
-      stuckCommitDurationMillis = 0;
+      getDataStatusProvider = getDataClient::printHtml;
+      currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+
+      this.streamingWorkerHarness =
+          SingleSourceWorkerHarness.builder()
+              .setStreamingWorkScheduler(streamingWorkScheduler)
+              .setWorkCommitter(workCommitter)
+              .setGetDataClient(getDataClient)
+              .setComputationStateFetcher(this.computationStateCache::get)
+              .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
+              .setHeartbeatSender(heartbeatSender)
+              .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime)
+              .setGetWorkSender(getWorkSender)
+              .build();
     }
 
+    this.workerStatusReporter =
+        streamingWorkerStatusReporterFactory.createStatusReporter(
+            streamingWorkerHarness::getAndResetThrottleTime);
     this.activeWorkRefresher =
         new ActiveWorkRefresher(
             clock,
             options.getActiveWorkRefreshPeriodMillis(),
-            stuckCommitDurationMillis,
+            options.isEnableStreamingEngine()
+                ? Math.max(options.getStuckCommitDurationMillis(), 0)
+                : 0,
             computationStateCache::getAllPresentComputations,
             sampler,
             executorSupplier.apply("RefreshWork"),
             getDataMetricTracker::trackHeartbeats);
 
     this.statusPages =
-        statusPagesBuilder
+        createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
             .setClock(clock)
             .setClientId(clientId)
             .setIsRunning(running)
-            .setStatusPages(workerStatusPages)
             .setStateCache(stateCache)
             .setComputationStateCache(this.computationStateCache)
-            
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
-            .setGetDataStatusProvider(getDataClient::printHtml)
             .setWorkUnitExecutor(workUnitExecutor)
             .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
+            .setChannelzServlet(channelzServlet)
+            .setGetDataStatusProvider(getDataStatusProvider)
+            .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
             .build();
 
-    Windmill.GetWorkRequest request =
-        Windmill.GetWorkRequest.newBuilder()
-            .setClientId(clientId)
-            .setMaxItems(chooseMaximumBundlesOutstanding())
-            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
-            .build();
-
-    this.streamingWorkerHarness =
-        SingleSourceWorkerHarness.builder()
-            .setStreamingWorkScheduler(streamingWorkScheduler)
-            .setWorkCommitter(workCommitter)
-            .setGetDataClient(getDataClient)
-            .setComputationStateFetcher(this.computationStateCache::get)
-            .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
-            .setHeartbeatSender(heartbeatSender)
-            .setGetWorkSender(
-                windmillServiceEnabled
-                    ? GetWorkSender.forStreamingEngine(
-                        receiver -> windmillServer.getWorkStream(request, 
receiver))
-                    : GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request)))
-            .build();
-
-    LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
+    LOG.debug("isDirectPathEnabled: {}", 
options.getIsWindmillServiceDirectPathEnabled());
+    LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
-  private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
-      WindmillServerStub windmillServer) {
-    return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillServer::getDataStream);
+  private static StreamingWorkerStatusPages.Builder createStatusPageBuilder(
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      MemoryMonitor memoryMonitor) {
+    WorkerStatusPages workerStatusPages =
+        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+    StreamingWorkerStatusPages.Builder streamingStatusPages =
+        StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages);
+
+    return options.isEnableStreamingEngine()
+        ? streamingStatusPages
+            .setDebugCapture(
+                new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
+            .setWindmillStreamFactory(windmillStreamFactory)
+        : streamingStatusPages;
+  }
+
+  private static ChannelzServlet createChannelZServlet(

Review Comment:
   nit: don't capitalize the Z



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -478,8 +574,55 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
       computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
     }
 
-    return ConfigFetcherComputationStateCacheAndWindmillClient.create(
-        configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
+    return builder
+        .setConfigFetcher(configFetcher)
+        .setComputationStateCache(computationStateCache)
+        .setWindmillServer(windmillServer)
+        .setWindmillStreamFactory(windmillStreamFactory)
+        .build();
+  }
+
+  private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions 
options) {
+    if (options.isEnableStreamingEngine() && 
options.getIsWindmillServiceDirectPathEnabled()) {
+      boolean isIpV6Enabled =
+          Optional.ofNullable(options.getDataflowServiceOptions())
+              .map(serviceOptions -> 
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+              .orElse(false);
+      if (isIpV6Enabled) {
+        return true;
+      }
+      LOG.warn(
+          "DirectPath is currently only supported with IPv6 networking stack. 
Defaulting to"
+              + " CloudPath.");

Review Comment:
   How about mentioning the service option to use or link to docs if some



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java:
##########
@@ -59,18 +59,19 @@ public void setUp() {
   @Test
   public void testOverrideMaximumThreadCount() throws Exception {
     StreamingWorkerStatusReporter reporter =
-        StreamingWorkerStatusReporter.forTesting(
-            true,
-            mockWorkUnitClient,
-            () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
-            () -> Collections.emptyList(),
-            mockFailureTracker,
-            StreamingCounters.create(),
-            mockMemoryMonitor,
-            mockExecutor,
-            (threadName) -> Executors.newSingleThreadScheduledExecutor(),
-            DEFAULT_HARNESS_REPORTING_PERIOD,
-            DEFAULT_PER_WORKER_METRICS_PERIOD);
+        StreamingWorkerStatusReporter.builder()

Review Comment:
   can you make a testReporterBuilder() in this test that sets all these 
defaults?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -441,6 +520,8 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
     WindmillServerStub windmillServer;
     ComputationStateCache computationStateCache;
     GrpcWindmillStreamFactory windmillStreamFactory;
+    ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder =

Review Comment:
   seems inconsistent to set dispatcher on builder but have the separate 
variables for the rest.
   
   How about removing the varialbes and setting on the builder?  It seems you 
can remove some nesting and return the build() result early in some cases like 
SE



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -478,8 +574,55 @@ public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions o
       computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
     }
 
-    return ConfigFetcherComputationStateCacheAndWindmillClient.create(
-        configFetcher, computationStateCache, windmillServer, 
windmillStreamFactory);
+    return builder
+        .setConfigFetcher(configFetcher)
+        .setComputationStateCache(computationStateCache)
+        .setWindmillServer(windmillServer)
+        .setWindmillStreamFactory(windmillStreamFactory)
+        .build();
+  }
+
+  private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions 
options) {
+    if (options.isEnableStreamingEngine() && 
options.getIsWindmillServiceDirectPathEnabled()) {
+      boolean isIpV6Enabled =
+          Optional.ofNullable(options.getDataflowServiceOptions())
+              .map(serviceOptions -> 
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+              .orElse(false);
+      if (isIpV6Enabled) {
+        return true;
+      }
+      LOG.warn(
+          "DirectPath is currently only supported with IPv6 networking stack. 
Defaulting to"
+              + " CloudPath.");
+    }
+    return false;
+  }
+
+  private static void validateWorkerOptions(DataflowWorkerHarnessOptions 
options) {
+    Preconditions.checkArgument(
+        options.isStreaming(),
+        "%s instantiated with options indicating batch use",
+        StreamingDataflowWorker.class.getName());
+
+    Preconditions.checkArgument(
+        !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT),
+        "%s cannot be main() class with beam_fn_api enabled",
+        StreamingDataflowWorker.class.getSimpleName());
+  }
+
+  private static ChannelCachingStubFactory createStubFactory(

Review Comment:
   name createFanoutStubFactory since just used there and we don't want 
unconditional isolation (yet) in other case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to