m-trieu commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1643832609
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -237,32 +244,124 @@ private StreamingDataflowWorker(
dispatchThread.setName("DispatchThread");
this.clientId = clientId;
this.windmillServer = windmillServer;
+
+ WindmillStreamPool<WindmillStream.GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+
this.metricTrackingWindmillServer =
MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
.setUseStreamingRequests(windmillServiceEnabled)
-
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
- .setNumGetDataStreams(options.getWindmillGetDataStreamCount())
+ .setGetDataStreamPool(getDataStreamPool)
.build();
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
+ WorkerStatusPages workerStatusPages =
+ WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+ this.streamingCounters = streamingCounters;
+ this.memoryMonitor = memoryMonitor;
+
+ this.streamingEngineClient = null;
+ this.streamingWorkScheduler =
+ StreamingWorkScheduler.create(
+ options,
+ clock,
+ readerCache,
+ mapTaskExecutorFactory,
+ workUnitExecutor,
+ stateCache::forComputation,
+ request ->
+ streamingEngineClient != null
+ ? metricTrackingWindmillServer.getSideInputData(
+
streamingEngineClient.getGlobalDataStream(request.getDataId().getTag()),
+ request)
+ : metricTrackingWindmillServer.getSideInputData(request),
+ failureTracker,
+ workFailureProcessor,
+ streamingCounters,
+ hotKeyLogger,
+ sampler,
+ maxWorkItemCommitBytes,
+ ID_GENERATOR,
+ stageInfoMap);
+
int stuckCommitDurationMillis =
windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0
? options.getStuckCommitDurationMillis()
: 0;
+ if (isDirectPathPipeline(options)) {
+ this.streamingEngineClient =
Review Comment:
I think we can
We can probably just inject the endpoints manually
might be better to do in a separate PR since we would have to change the way
we test in StreamingDataflowWorker as that uses FakeWindmillServer and
StreamingEngineClient does not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]