m-trieu commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1643832609


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -237,32 +244,124 @@ private StreamingDataflowWorker(
     dispatchThread.setName("DispatchThread");
     this.clientId = clientId;
     this.windmillServer = windmillServer;
+
+    WindmillStreamPool<WindmillStream.GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+
     this.metricTrackingWindmillServer =
         MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
             .setUseStreamingRequests(windmillServiceEnabled)
-            
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
-            .setNumGetDataStreams(options.getWindmillGetDataStreamCount())
+            .setGetDataStreamPool(getDataStreamPool)
             .build();
 
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(options);
 
+    WorkerStatusPages workerStatusPages =
+        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+    this.streamingCounters = streamingCounters;
+    this.memoryMonitor = memoryMonitor;
+
+    this.streamingEngineClient = null;
+    this.streamingWorkScheduler =
+        StreamingWorkScheduler.create(
+            options,
+            clock,
+            readerCache,
+            mapTaskExecutorFactory,
+            workUnitExecutor,
+            stateCache::forComputation,
+            request ->
+                streamingEngineClient != null
+                    ? metricTrackingWindmillServer.getSideInputData(
+                        
streamingEngineClient.getGlobalDataStream(request.getDataId().getTag()),
+                        request)
+                    : metricTrackingWindmillServer.getSideInputData(request),
+            failureTracker,
+            workFailureProcessor,
+            streamingCounters,
+            hotKeyLogger,
+            sampler,
+            maxWorkItemCommitBytes,
+            ID_GENERATOR,
+            stageInfoMap);
+
     int stuckCommitDurationMillis =
         windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0
             ? options.getStuckCommitDurationMillis()
             : 0;
+    if (isDirectPathPipeline(options)) {
+      this.streamingEngineClient =

Review Comment:
   I think we can
   
   We can probably just inject the endpoints manually 
   
   might be better to do in a separate PR since we would have to change the way 
we test in StreamingDataflowWorker as that uses FakeWindmillServer and 
StreamingEngineClient does not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to