m-trieu commented on code in PR #32778:
URL: https://github.com/apache/beam/pull/32778#discussion_r1849062614


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -118,6 +130,8 @@ public final class StreamingDataflowWorker {
    */
   public static final int MAX_SINK_BYTES = 10_000_000;
 
+  public static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -233,110 +225,192 @@ private StreamingDataflowWorker(
             ID_GENERATOR,
             configFetcher.getGlobalConfigHandle(),
             stageInfoMap);
-
     ThrottlingGetDataMetricTracker getDataMetricTracker =
         new ThrottlingGetDataMetricTracker(memoryMonitor);
-    WorkerStatusPages workerStatusPages =
-        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
-    StreamingWorkerStatusPages.Builder statusPagesBuilder = 
StreamingWorkerStatusPages.builder();
-    int stuckCommitDurationMillis;
-    GetDataClient getDataClient;
-    HeartbeatSender heartbeatSender;
-    if (windmillServiceEnabled) {
-      WindmillStreamPool<GetDataStream> getDataStreamPool =
-          WindmillStreamPool.create(
-              Math.max(1, options.getWindmillGetDataStreamCount()),
-              GET_DATA_STREAM_TIMEOUT,
-              windmillServer::getDataStream);
-      getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      // Experiment gates the logic till backend changes are rollback safe
-      if (!DataflowRunner.hasExperiment(
-              options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
-          || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+    // Status page members. Different implementations on whether the harness 
is streaming engine
+    // direct path, streaming engine cloud path, or streaming appliance.
+    @Nullable ChannelzServlet channelzServlet = null;
+    Consumer<PrintWriter> getDataStatusProvider;
+    Supplier<Long> currentActiveCommitBytesProvider;
+    if (isDirectPathPipeline(options)) {
+      FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+          FanOutStreamingEngineWorkerHarness.create(
+              createJobHeader(options, clientId),
+              GetWorkBudget.builder()
+                  .setItems(chooseMaxBundlesOutstanding(options))
+                  .setBytes(MAX_GET_WORK_FETCH_BYTES)
+                  .build(),
+              windmillStreamFactory,
+              (workItem, watermarks, processingContext, 
getWorkStreamLatencies) ->
+                  computationStateCache
+                      .get(processingContext.computationId())
+                      .ifPresent(
+                          computationState -> {
+                            memoryMonitor.waitForResources("GetWork");
+                            streamingWorkScheduler.scheduleWork(
+                                computationState,
+                                workItem,
+                                watermarks,
+                                processingContext,
+                                getWorkStreamLatencies);
+                          }),
+              createStubFactory(options),
+              // TODO (m-trieu) replace with 
GetWorkBudgetDistributors.distributeEvenly() once
+              // https://github.com/apache/beam/pull/32775 is merged.
+              
GetWorkBudgetDistributors.distributeEvenly(GetWorkBudget::noBudget),
+              Preconditions.checkNotNull(dispatcherClient),
+              commitWorkStream ->
+                  StreamingEngineWorkCommitter.builder()
+                      
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+                      .setOnCommitComplete(this::onCompleteCommit)
+                      
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                      .setCommitWorkStreamFactory(
+                          () -> CloseableStream.create(commitWorkStream, () -> 
{}))
+                      .build(),
+              getDataMetricTracker);
+      getDataStatusProvider = getDataMetricTracker::printHtml;
+      currentActiveCommitBytesProvider =
+          fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+      channelzServlet =
+          createChannelZServlet(
+              options, 
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+      this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+    } else {
+      Windmill.GetWorkRequest request = createGetWorkRequest(clientId, 
options);
+      GetDataClient getDataClient;
+      HeartbeatSender heartbeatSender;
+      WorkCommitter workCommitter;
+      GetWorkSender getWorkSender;
+      if (options.isEnableStreamingEngine()) {
+        WindmillStreamPool<GetDataStream> getDataStreamPool =
+            WindmillStreamPool.create(
+                Math.max(1, options.getWindmillGetDataStreamCount()),
+                GET_DATA_STREAM_TIMEOUT,
+                windmillServer::getDataStream);
+        getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
         heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
-                    ? separateHeartbeatPool(windmillServer)
-                    : getDataStreamPool);
-
+            createStreamingEngineHeartbeatSender(
+                options, windmillServer, getDataStreamPool, 
configFetcher.getGlobalConfigHandle());
+        channelzServlet =
+            createChannelZServlet(options, 
windmillServer::getWindmillServiceEndpoints);
+        workCommitter =
+            StreamingEngineWorkCommitter.builder()
+                .setCommitWorkStreamFactory(
+                    WindmillStreamPool.create(
+                            numCommitThreads,
+                            COMMIT_STREAM_TIMEOUT,
+                            windmillServer::commitWorkStream)
+                        ::getCloseableStream)
+                .setNumCommitSenders(numCommitThreads)
+                .setOnCommitComplete(this::onCompleteCommit)
+                .build();
+        getWorkSender =
+            GetWorkSender.forStreamingEngine(
+                receiver -> windmillServer.getWorkStream(request, receiver));
       } else {
-        heartbeatSender =
-            StreamPoolHeartbeatSender.Create(
-                separateHeartbeatPool(windmillServer),
-                getDataStreamPool,
-                configFetcher.getGlobalConfigHandle());
+        getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
+        heartbeatSender = new 
ApplianceHeartbeatSender(windmillServer::getData);
+        workCommitter =
+            StreamingApplianceWorkCommitter.create(
+                windmillServer::commitWork, this::onCompleteCommit);
+        getWorkSender = GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request));
       }
 
-      stuckCommitDurationMillis =
-          options.getStuckCommitDurationMillis() > 0 ? 
options.getStuckCommitDurationMillis() : 0;
-      statusPagesBuilder
-          .setDebugCapture(
-              new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
-          .setChannelzServlet(
-              new ChannelzServlet(
-                  CHANNELZ_PATH, options, 
windmillServer::getWindmillServiceEndpoints))
-          .setWindmillStreamFactory(windmillStreamFactory);
-    } else {
-      getDataClient = new ApplianceGetDataClient(windmillServer, 
getDataMetricTracker);
-      heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
-      stuckCommitDurationMillis = 0;
+      getDataStatusProvider = getDataClient::printHtml;
+      currentActiveCommitBytesProvider = 
workCommitter::currentActiveCommitBytes;
+
+      this.streamingWorkerHarness =
+          SingleSourceWorkerHarness.builder()
+              .setStreamingWorkScheduler(streamingWorkScheduler)
+              .setWorkCommitter(workCommitter)
+              .setGetDataClient(getDataClient)
+              .setComputationStateFetcher(this.computationStateCache::get)
+              .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
+              .setHeartbeatSender(heartbeatSender)
+              .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime)
+              .setGetWorkSender(getWorkSender)
+              .build();
     }
 
+    this.workerStatusReporter =
+        streamingWorkerStatusReporterFactory.createStatusReporter(
+            streamingWorkerHarness::getAndResetThrottleTime);
     this.activeWorkRefresher =
         new ActiveWorkRefresher(
             clock,
             options.getActiveWorkRefreshPeriodMillis(),
-            stuckCommitDurationMillis,
+            options.isEnableStreamingEngine()
+                ? Math.max(options.getStuckCommitDurationMillis(), 0)
+                : 0,
             computationStateCache::getAllPresentComputations,
             sampler,
             executorSupplier.apply("RefreshWork"),
             getDataMetricTracker::trackHeartbeats);
 
     this.statusPages =
-        statusPagesBuilder
+        createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
             .setClock(clock)
             .setClientId(clientId)
             .setIsRunning(running)
-            .setStatusPages(workerStatusPages)
             .setStateCache(stateCache)
             .setComputationStateCache(this.computationStateCache)
-            
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
-            .setGetDataStatusProvider(getDataClient::printHtml)
             .setWorkUnitExecutor(workUnitExecutor)
             .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
+            .setChannelzServlet(channelzServlet)
+            .setGetDataStatusProvider(getDataStatusProvider)
+            .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
             .build();
 
-    Windmill.GetWorkRequest request =
-        Windmill.GetWorkRequest.newBuilder()
-            .setClientId(clientId)
-            .setMaxItems(chooseMaximumBundlesOutstanding())
-            .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
-            .build();
-
-    this.streamingWorkerHarness =
-        SingleSourceWorkerHarness.builder()
-            .setStreamingWorkScheduler(streamingWorkScheduler)
-            .setWorkCommitter(workCommitter)
-            .setGetDataClient(getDataClient)
-            .setComputationStateFetcher(this.computationStateCache::get)
-            .setWaitForResources(() -> 
memoryMonitor.waitForResources("GetWork"))
-            .setHeartbeatSender(heartbeatSender)
-            .setGetWorkSender(
-                windmillServiceEnabled
-                    ? GetWorkSender.forStreamingEngine(
-                        receiver -> windmillServer.getWorkStream(request, 
receiver))
-                    : GetWorkSender.forAppliance(() -> 
windmillServer.getWork(request)))
-            .build();
-
-    LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
+    LOG.debug("isDirectPathEnabled: {}", 
options.getIsWindmillServiceDirectPathEnabled());
+    LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
     LOG.debug("WindmillServiceEndpoint: {}", 
options.getWindmillServiceEndpoint());
     LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
-  private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
-      WindmillServerStub windmillServer) {
-    return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillServer::getDataStream);
+  private static StreamingWorkerStatusPages.Builder createStatusPageBuilder(
+      DataflowWorkerHarnessOptions options,
+      GrpcWindmillStreamFactory windmillStreamFactory,
+      MemoryMonitor memoryMonitor) {
+    WorkerStatusPages workerStatusPages =
+        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+    StreamingWorkerStatusPages.Builder streamingStatusPages =
+        StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages);
+
+    return options.isEnableStreamingEngine()
+        ? streamingStatusPages
+            .setDebugCapture(
+                new DebugCapture.Manager(options, 
workerStatusPages.getDebugCapturePages()))
+            .setWindmillStreamFactory(windmillStreamFactory)
+        : streamingStatusPages;
+  }
+
+  private static ChannelzServlet createChannelZServlet(

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to