scwhittle commented on code in PR #32778:
URL: https://github.com/apache/beam/pull/32778#discussion_r1801053010
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -118,6 +130,8 @@ public final class StreamingDataflowWorker {
*/
public static final int MAX_SINK_BYTES = 10_000_000;
+ public static final String
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
Review Comment:
move back where it was and make private?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -234,107 +230,199 @@ private StreamingDataflowWorker(
ID_GENERATOR,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);
-
ThrottlingGetDataMetricTracker getDataMetricTracker =
new ThrottlingGetDataMetricTracker(memoryMonitor);
- WorkerStatusPages workerStatusPages =
- WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
- StreamingWorkerStatusPages.Builder statusPagesBuilder =
StreamingWorkerStatusPages.builder();
- int stuckCommitDurationMillis;
- GetDataClient getDataClient;
- HeartbeatSender heartbeatSender;
- if (windmillServiceEnabled) {
- WindmillStreamPool<GetDataStream> getDataStreamPool =
- WindmillStreamPool.create(
- Math.max(1, options.getWindmillGetDataStreamCount()),
- GET_DATA_STREAM_TIMEOUT,
- windmillServer::getDataStream);
- getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ // Status page members. Different implementations on whether the harness
is streaming engine
+ // direct path, streaming engine cloud path, or streaming appliance.
+ @Nullable ChannelzServlet channelzServlet = null;
+ Consumer<PrintWriter> getDataStatusProvider;
+ Supplier<Long> currentActiveCommitBytesProvider;
+ if (isDirectPathPipeline(options)) {
+ WeightedSemaphore<Commit> maxCommitByteSemaphore =
Commits.maxCommitByteSemaphore();
+ FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+ FanOutStreamingEngineWorkerHarness.create(
+ createJobHeader(options, clientId),
+ GetWorkBudget.builder()
+ .setItems(chooseMaxBundlesOutstanding(options))
+ .setBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build(),
+ windmillStreamFactory,
+ (workItem, watermarks, processingContext,
getWorkStreamLatencies) ->
+ computationStateCache
+ .get(processingContext.computationId())
+ .ifPresent(
+ computationState -> {
+ memoryMonitor.waitForResources("GetWork");
+ streamingWorkScheduler.scheduleWork(
+ computationState,
+ workItem,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies);
+ }),
+ createStubFactory(options),
+ GetWorkBudgetDistributors.distributeEvenly(),
+ Preconditions.checkNotNull(dispatcherClient),
+ commitWorkStream ->
+ StreamingEngineWorkCommitter.builder()
+ // Share the commitByteSemaphore across all created
workCommitters.
+ .setCommitByteSemaphore(maxCommitByteSemaphore)
+
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+ .setOnCommitComplete(this::onCompleteCommit)
+
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setCommitWorkStreamFactory(
+ () -> CloseableStream.create(commitWorkStream, () ->
{}))
+ .build(),
+ getDataMetricTracker);
+ getDataStatusProvider = getDataMetricTracker::printHtml;
+ currentActiveCommitBytesProvider =
+ fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+ channelzServlet =
+ createChannelZServlet(
+ options,
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+ this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+ } else {
+ Windmill.GetWorkRequest request =
Review Comment:
// Not a direct-path pipeline.
since if is big
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -233,110 +225,192 @@ private StreamingDataflowWorker(
ID_GENERATOR,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);
-
ThrottlingGetDataMetricTracker getDataMetricTracker =
new ThrottlingGetDataMetricTracker(memoryMonitor);
- WorkerStatusPages workerStatusPages =
- WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
- StreamingWorkerStatusPages.Builder statusPagesBuilder =
StreamingWorkerStatusPages.builder();
- int stuckCommitDurationMillis;
- GetDataClient getDataClient;
- HeartbeatSender heartbeatSender;
- if (windmillServiceEnabled) {
- WindmillStreamPool<GetDataStream> getDataStreamPool =
- WindmillStreamPool.create(
- Math.max(1, options.getWindmillGetDataStreamCount()),
- GET_DATA_STREAM_TIMEOUT,
- windmillServer::getDataStream);
- getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- // Experiment gates the logic till backend changes are rollback safe
- if (!DataflowRunner.hasExperiment(
- options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
- || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ // Status page members. Different implementations on whether the harness
is streaming engine
+ // direct path, streaming engine cloud path, or streaming appliance.
+ @Nullable ChannelzServlet channelzServlet = null;
+ Consumer<PrintWriter> getDataStatusProvider;
+ Supplier<Long> currentActiveCommitBytesProvider;
+ if (isDirectPathPipeline(options)) {
+ FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+ FanOutStreamingEngineWorkerHarness.create(
+ createJobHeader(options, clientId),
+ GetWorkBudget.builder()
+ .setItems(chooseMaxBundlesOutstanding(options))
+ .setBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build(),
+ windmillStreamFactory,
+ (workItem, watermarks, processingContext,
getWorkStreamLatencies) ->
+ computationStateCache
+ .get(processingContext.computationId())
+ .ifPresent(
+ computationState -> {
+ memoryMonitor.waitForResources("GetWork");
+ streamingWorkScheduler.scheduleWork(
+ computationState,
+ workItem,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies);
+ }),
+ createStubFactory(options),
+ // TODO (m-trieu) replace with
GetWorkBudgetDistributors.distributeEvenly() once
+ // https://github.com/apache/beam/pull/32775 is merged.
+
GetWorkBudgetDistributors.distributeEvenly(GetWorkBudget::noBudget),
+ Preconditions.checkNotNull(dispatcherClient),
+ commitWorkStream ->
+ StreamingEngineWorkCommitter.builder()
+
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+ .setOnCommitComplete(this::onCompleteCommit)
+
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setCommitWorkStreamFactory(
+ () -> CloseableStream.create(commitWorkStream, () ->
{}))
+ .build(),
+ getDataMetricTracker);
+ getDataStatusProvider = getDataMetricTracker::printHtml;
+ currentActiveCommitBytesProvider =
+ fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+ channelzServlet =
+ createChannelZServlet(
+ options,
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+ this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+ } else {
+ Windmill.GetWorkRequest request = createGetWorkRequest(clientId,
options);
+ GetDataClient getDataClient;
+ HeartbeatSender heartbeatSender;
+ WorkCommitter workCommitter;
+ GetWorkSender getWorkSender;
+ if (options.isEnableStreamingEngine()) {
+ WindmillStreamPool<GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+ getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
heartbeatSender =
- StreamPoolHeartbeatSender.Create(
-
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
- ? separateHeartbeatPool(windmillServer)
- : getDataStreamPool);
-
+ createStreamingEngineHeartbeatSender(
+ options, windmillServer, getDataStreamPool,
configFetcher.getGlobalConfigHandle());
+ channelzServlet =
+ createChannelZServlet(options,
windmillServer::getWindmillServiceEndpoints);
+ workCommitter =
+ StreamingEngineWorkCommitter.builder()
+ .setCommitWorkStreamFactory(
+ WindmillStreamPool.create(
+ numCommitThreads,
+ COMMIT_STREAM_TIMEOUT,
+ windmillServer::commitWorkStream)
+ ::getCloseableStream)
+ .setNumCommitSenders(numCommitThreads)
+ .setOnCommitComplete(this::onCompleteCommit)
+ .build();
+ getWorkSender =
+ GetWorkSender.forStreamingEngine(
+ receiver -> windmillServer.getWorkStream(request, receiver));
} else {
- heartbeatSender =
- StreamPoolHeartbeatSender.Create(
- separateHeartbeatPool(windmillServer),
- getDataStreamPool,
- configFetcher.getGlobalConfigHandle());
+ getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
+ heartbeatSender = new
ApplianceHeartbeatSender(windmillServer::getData);
+ workCommitter =
+ StreamingApplianceWorkCommitter.create(
+ windmillServer::commitWork, this::onCompleteCommit);
+ getWorkSender = GetWorkSender.forAppliance(() ->
windmillServer.getWork(request));
}
- stuckCommitDurationMillis =
- options.getStuckCommitDurationMillis() > 0 ?
options.getStuckCommitDurationMillis() : 0;
- statusPagesBuilder
- .setDebugCapture(
- new DebugCapture.Manager(options,
workerStatusPages.getDebugCapturePages()))
- .setChannelzServlet(
- new ChannelzServlet(
- CHANNELZ_PATH, options,
windmillServer::getWindmillServiceEndpoints))
- .setWindmillStreamFactory(windmillStreamFactory);
- } else {
- getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
- heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
- stuckCommitDurationMillis = 0;
+ getDataStatusProvider = getDataClient::printHtml;
+ currentActiveCommitBytesProvider =
workCommitter::currentActiveCommitBytes;
+
+ this.streamingWorkerHarness =
+ SingleSourceWorkerHarness.builder()
+ .setStreamingWorkScheduler(streamingWorkScheduler)
+ .setWorkCommitter(workCommitter)
+ .setGetDataClient(getDataClient)
+ .setComputationStateFetcher(this.computationStateCache::get)
+ .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
+ .setHeartbeatSender(heartbeatSender)
+ .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime)
+ .setGetWorkSender(getWorkSender)
+ .build();
}
+ this.workerStatusReporter =
+ streamingWorkerStatusReporterFactory.createStatusReporter(
+ streamingWorkerHarness::getAndResetThrottleTime);
this.activeWorkRefresher =
new ActiveWorkRefresher(
clock,
options.getActiveWorkRefreshPeriodMillis(),
- stuckCommitDurationMillis,
+ options.isEnableStreamingEngine()
+ ? Math.max(options.getStuckCommitDurationMillis(), 0)
+ : 0,
computationStateCache::getAllPresentComputations,
sampler,
executorSupplier.apply("RefreshWork"),
getDataMetricTracker::trackHeartbeats);
this.statusPages =
- statusPagesBuilder
+ createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
.setClock(clock)
.setClientId(clientId)
.setIsRunning(running)
- .setStatusPages(workerStatusPages)
.setStateCache(stateCache)
.setComputationStateCache(this.computationStateCache)
-
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
- .setGetDataStatusProvider(getDataClient::printHtml)
.setWorkUnitExecutor(workUnitExecutor)
.setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
+ .setChannelzServlet(channelzServlet)
+ .setGetDataStatusProvider(getDataStatusProvider)
+ .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
.build();
- Windmill.GetWorkRequest request =
- Windmill.GetWorkRequest.newBuilder()
- .setClientId(clientId)
- .setMaxItems(chooseMaximumBundlesOutstanding())
- .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
- .build();
-
- this.streamingWorkerHarness =
- SingleSourceWorkerHarness.builder()
- .setStreamingWorkScheduler(streamingWorkScheduler)
- .setWorkCommitter(workCommitter)
- .setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
- .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
- .setHeartbeatSender(heartbeatSender)
- .setGetWorkSender(
- windmillServiceEnabled
- ? GetWorkSender.forStreamingEngine(
- receiver -> windmillServer.getWorkStream(request,
receiver))
- : GetWorkSender.forAppliance(() ->
windmillServer.getWork(request)))
- .build();
-
- LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
+ LOG.debug("isDirectPathEnabled: {}",
options.getIsWindmillServiceDirectPathEnabled());
+ LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
LOG.debug("WindmillServiceEndpoint: {}",
options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}
- private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
- WindmillServerStub windmillServer) {
- return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
+ private static StreamingWorkerStatusPages.Builder createStatusPageBuilder(
+ DataflowWorkerHarnessOptions options,
+ GrpcWindmillStreamFactory windmillStreamFactory,
+ MemoryMonitor memoryMonitor) {
+ WorkerStatusPages workerStatusPages =
+ WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+ StreamingWorkerStatusPages.Builder streamingStatusPages =
+ StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages);
+
+ return options.isEnableStreamingEngine()
+ ? streamingStatusPages
+ .setDebugCapture(
+ new DebugCapture.Manager(options,
workerStatusPages.getDebugCapturePages()))
+ .setWindmillStreamFactory(windmillStreamFactory)
+ : streamingStatusPages;
+ }
+
+ private static ChannelzServlet createChannelZServlet(
Review Comment:
nit: don't capitalize the Z
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -478,8 +574,55 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
computationStateCache =
computationStateCacheFactory.apply(configFetcher);
}
- return ConfigFetcherComputationStateCacheAndWindmillClient.create(
- configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
+ return builder
+ .setConfigFetcher(configFetcher)
+ .setComputationStateCache(computationStateCache)
+ .setWindmillServer(windmillServer)
+ .setWindmillStreamFactory(windmillStreamFactory)
+ .build();
+ }
+
+ private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions
options) {
+ if (options.isEnableStreamingEngine() &&
options.getIsWindmillServiceDirectPathEnabled()) {
+ boolean isIpV6Enabled =
+ Optional.ofNullable(options.getDataflowServiceOptions())
+ .map(serviceOptions ->
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+ .orElse(false);
+ if (isIpV6Enabled) {
+ return true;
+ }
+ LOG.warn(
+ "DirectPath is currently only supported with IPv6 networking stack.
Defaulting to"
+ + " CloudPath.");
Review Comment:
How about mentioning the service option to use or link to docs if some
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java:
##########
@@ -59,18 +59,19 @@ public void setUp() {
@Test
public void testOverrideMaximumThreadCount() throws Exception {
StreamingWorkerStatusReporter reporter =
- StreamingWorkerStatusReporter.forTesting(
- true,
- mockWorkUnitClient,
- () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
- () -> Collections.emptyList(),
- mockFailureTracker,
- StreamingCounters.create(),
- mockMemoryMonitor,
- mockExecutor,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- DEFAULT_HARNESS_REPORTING_PERIOD,
- DEFAULT_PER_WORKER_METRICS_PERIOD);
+ StreamingWorkerStatusReporter.builder()
Review Comment:
can you make a testReporterBuilder() in this test that sets all these
defaults?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -441,6 +520,8 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
WindmillServerStub windmillServer;
ComputationStateCache computationStateCache;
GrpcWindmillStreamFactory windmillStreamFactory;
+ ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder =
Review Comment:
seems inconsistent to set dispatcher on builder but have the separate
variables for the rest.
How about removing the varialbes and setting on the builder? It seems you
can remove some nesting and return the build() result early in some cases like
SE
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -478,8 +574,55 @@ public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions o
computationStateCache =
computationStateCacheFactory.apply(configFetcher);
}
- return ConfigFetcherComputationStateCacheAndWindmillClient.create(
- configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
+ return builder
+ .setConfigFetcher(configFetcher)
+ .setComputationStateCache(computationStateCache)
+ .setWindmillServer(windmillServer)
+ .setWindmillStreamFactory(windmillStreamFactory)
+ .build();
+ }
+
+ private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions
options) {
+ if (options.isEnableStreamingEngine() &&
options.getIsWindmillServiceDirectPathEnabled()) {
+ boolean isIpV6Enabled =
+ Optional.ofNullable(options.getDataflowServiceOptions())
+ .map(serviceOptions ->
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+ .orElse(false);
+ if (isIpV6Enabled) {
+ return true;
+ }
+ LOG.warn(
+ "DirectPath is currently only supported with IPv6 networking stack.
Defaulting to"
+ + " CloudPath.");
+ }
+ return false;
+ }
+
+ private static void validateWorkerOptions(DataflowWorkerHarnessOptions
options) {
+ Preconditions.checkArgument(
+ options.isStreaming(),
+ "%s instantiated with options indicating batch use",
+ StreamingDataflowWorker.class.getName());
+
+ Preconditions.checkArgument(
+ !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT),
+ "%s cannot be main() class with beam_fn_api enabled",
+ StreamingDataflowWorker.class.getSimpleName());
+ }
+
+ private static ChannelCachingStubFactory createStubFactory(
Review Comment:
name createFanoutStubFactory since just used there and we don't want
unconditional isolation (yet) in other case?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]