scwhittle commented on code in PR #32778: URL: https://github.com/apache/beam/pull/32778#discussion_r1801053010
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -118,6 +130,8 @@ public final class StreamingDataflowWorker { */ public static final int MAX_SINK_BYTES = 10_000_000; + public static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL = Review Comment: move back where it was and make private? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -234,107 +230,199 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - int stuckCommitDurationMillis; - GetDataClient getDataClient; - HeartbeatSender heartbeatSender; - if (windmillServiceEnabled) { - WindmillStreamPool<GetDataStream> getDataStreamPool = - WindmillStreamPool.create( - Math.max(1, options.getWindmillGetDataStreamCount()), - GET_DATA_STREAM_TIMEOUT, - windmillServer::getDataStream); - getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - if (options.getUseSeparateWindmillHeartbeatStreams() != null) { + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer<PrintWriter> getDataStatusProvider; + Supplier<Long> currentActiveCommitBytesProvider; + if (isDirectPathPipeline(options)) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + createStubFactory(options), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelZServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; + } else { + Windmill.GetWorkRequest request = Review Comment: // Not a direct-path pipeline. since if is big ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -233,110 +225,192 @@ private StreamingDataflowWorker( ID_GENERATOR, configFetcher.getGlobalConfigHandle(), stageInfoMap); - ThrottlingGetDataMetricTracker getDataMetricTracker = new ThrottlingGetDataMetricTracker(memoryMonitor); - WorkerStatusPages workerStatusPages = - WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); - StreamingWorkerStatusPages.Builder statusPagesBuilder = StreamingWorkerStatusPages.builder(); - int stuckCommitDurationMillis; - GetDataClient getDataClient; - HeartbeatSender heartbeatSender; - if (windmillServiceEnabled) { - WindmillStreamPool<GetDataStream> getDataStreamPool = - WindmillStreamPool.create( - Math.max(1, options.getWindmillGetDataStreamCount()), - GET_DATA_STREAM_TIMEOUT, - windmillServer::getDataStream); - getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); - // Experiment gates the logic till backend changes are rollback safe - if (!DataflowRunner.hasExperiment( - options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL) - || options.getUseSeparateWindmillHeartbeatStreams() != null) { + // Status page members. Different implementations on whether the harness is streaming engine + // direct path, streaming engine cloud path, or streaming appliance. + @Nullable ChannelzServlet channelzServlet = null; + Consumer<PrintWriter> getDataStatusProvider; + Supplier<Long> currentActiveCommitBytesProvider; + if (isDirectPathPipeline(options)) { + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, watermarks, processingContext, getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + createStubFactory(options), + // TODO (m-trieu) replace with GetWorkBudgetDistributors.distributeEvenly() once + // https://github.com/apache/beam/pull/32775 is merged. + GetWorkBudgetDistributors.distributeEvenly(GetWorkBudget::noBudget), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + getDataStatusProvider = getDataMetricTracker::printHtml; + currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + channelzServlet = + createChannelZServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness; + } else { + Windmill.GetWorkRequest request = createGetWorkRequest(clientId, options); + GetDataClient getDataClient; + HeartbeatSender heartbeatSender; + WorkCommitter workCommitter; + GetWorkSender getWorkSender; + if (options.isEnableStreamingEngine()) { + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); heartbeatSender = - StreamPoolHeartbeatSender.Create( - Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams()) - ? separateHeartbeatPool(windmillServer) - : getDataStreamPool); - + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + channelzServlet = + createChannelZServlet(options, windmillServer::getWindmillServiceEndpoints); + workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, + COMMIT_STREAM_TIMEOUT, + windmillServer::commitWorkStream) + ::getCloseableStream) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); } else { - heartbeatSender = - StreamPoolHeartbeatSender.Create( - separateHeartbeatPool(windmillServer), - getDataStreamPool, - configFetcher.getGlobalConfigHandle()); + getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); + heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); + workCommitter = + StreamingApplianceWorkCommitter.create( + windmillServer::commitWork, this::onCompleteCommit); + getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request)); } - stuckCommitDurationMillis = - options.getStuckCommitDurationMillis() > 0 ? options.getStuckCommitDurationMillis() : 0; - statusPagesBuilder - .setDebugCapture( - new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) - .setChannelzServlet( - new ChannelzServlet( - CHANNELZ_PATH, options, windmillServer::getWindmillServiceEndpoints)) - .setWindmillStreamFactory(windmillStreamFactory); - } else { - getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker); - heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData); - stuckCommitDurationMillis = 0; + getDataStatusProvider = getDataClient::printHtml; + currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + + this.streamingWorkerHarness = + SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setThrottleTimeSupplier(windmillServer::getAndResetThrottleTime) + .setGetWorkSender(getWorkSender) + .build(); } + this.workerStatusReporter = + streamingWorkerStatusReporterFactory.createStatusReporter( + streamingWorkerHarness::getAndResetThrottleTime); this.activeWorkRefresher = new ActiveWorkRefresher( clock, options.getActiveWorkRefreshPeriodMillis(), - stuckCommitDurationMillis, + options.isEnableStreamingEngine() + ? Math.max(options.getStuckCommitDurationMillis(), 0) + : 0, computationStateCache::getAllPresentComputations, sampler, executorSupplier.apply("RefreshWork"), getDataMetricTracker::trackHeartbeats); this.statusPages = - statusPagesBuilder + createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor) .setClock(clock) .setClientId(clientId) .setIsRunning(running) - .setStatusPages(workerStatusPages) .setStateCache(stateCache) .setComputationStateCache(this.computationStateCache) - .setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes) - .setGetDataStatusProvider(getDataClient::printHtml) .setWorkUnitExecutor(workUnitExecutor) .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle()) + .setChannelzServlet(channelzServlet) + .setGetDataStatusProvider(getDataStatusProvider) + .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider) .build(); - Windmill.GetWorkRequest request = - Windmill.GetWorkRequest.newBuilder() - .setClientId(clientId) - .setMaxItems(chooseMaximumBundlesOutstanding()) - .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) - .build(); - - this.streamingWorkerHarness = - SingleSourceWorkerHarness.builder() - .setStreamingWorkScheduler(streamingWorkScheduler) - .setWorkCommitter(workCommitter) - .setGetDataClient(getDataClient) - .setComputationStateFetcher(this.computationStateCache::get) - .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) - .setHeartbeatSender(heartbeatSender) - .setGetWorkSender( - windmillServiceEnabled - ? GetWorkSender.forStreamingEngine( - receiver -> windmillServer.getWorkStream(request, receiver)) - : GetWorkSender.forAppliance(() -> windmillServer.getWork(request))) - .build(); - - LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled); + LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled()); + LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine()); LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } - private static WindmillStreamPool<GetDataStream> separateHeartbeatPool( - WindmillServerStub windmillServer) { - return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, windmillServer::getDataStream); + private static StreamingWorkerStatusPages.Builder createStatusPageBuilder( + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + MemoryMonitor memoryMonitor) { + WorkerStatusPages workerStatusPages = + WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor); + + StreamingWorkerStatusPages.Builder streamingStatusPages = + StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages); + + return options.isEnableStreamingEngine() + ? streamingStatusPages + .setDebugCapture( + new DebugCapture.Manager(options, workerStatusPages.getDebugCapturePages())) + .setWindmillStreamFactory(windmillStreamFactory) + : streamingStatusPages; + } + + private static ChannelzServlet createChannelZServlet( Review Comment: nit: don't capitalize the Z ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -478,8 +574,55 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o computationStateCache = computationStateCacheFactory.apply(configFetcher); } - return ConfigFetcherComputationStateCacheAndWindmillClient.create( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + return builder + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .build(); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { + boolean isIpV6Enabled = + Optional.ofNullable(options.getDataflowServiceOptions()) + .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) + .orElse(false); + if (isIpV6Enabled) { + return true; + } + LOG.warn( + "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" + + " CloudPath."); Review Comment: How about mentioning the service option to use or link to docs if some ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java: ########## @@ -59,18 +59,19 @@ public void setUp() { @Test public void testOverrideMaximumThreadCount() throws Exception { StreamingWorkerStatusReporter reporter = - StreamingWorkerStatusReporter.forTesting( - true, - mockWorkUnitClient, - () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME, - () -> Collections.emptyList(), - mockFailureTracker, - StreamingCounters.create(), - mockMemoryMonitor, - mockExecutor, - (threadName) -> Executors.newSingleThreadScheduledExecutor(), - DEFAULT_HARNESS_REPORTING_PERIOD, - DEFAULT_PER_WORKER_METRICS_PERIOD); + StreamingWorkerStatusReporter.builder() Review Comment: can you make a testReporterBuilder() in this test that sets all these defaults? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -441,6 +520,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillServerStub windmillServer; ComputationStateCache computationStateCache; GrpcWindmillStreamFactory windmillStreamFactory; + ConfigFetcherComputationStateCacheAndWindmillClient.Builder builder = Review Comment: seems inconsistent to set dispatcher on builder but have the separate variables for the rest. How about removing the varialbes and setting on the builder? It seems you can remove some nesting and return the build() result early in some cases like SE ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -478,8 +574,55 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o computationStateCache = computationStateCacheFactory.apply(configFetcher); } - return ConfigFetcherComputationStateCacheAndWindmillClient.create( - configFetcher, computationStateCache, windmillServer, windmillStreamFactory); + return builder + .setConfigFetcher(configFetcher) + .setComputationStateCache(computationStateCache) + .setWindmillServer(windmillServer) + .setWindmillStreamFactory(windmillStreamFactory) + .build(); + } + + private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) { + if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { + boolean isIpV6Enabled = + Optional.ofNullable(options.getDataflowServiceOptions()) + .map(serviceOptions -> serviceOptions.contains(ENABLE_IPV6_EXPERIMENT)) + .orElse(false); + if (isIpV6Enabled) { + return true; + } + LOG.warn( + "DirectPath is currently only supported with IPv6 networking stack. Defaulting to" + + " CloudPath."); + } + return false; + } + + private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) { + Preconditions.checkArgument( + options.isStreaming(), + "%s instantiated with options indicating batch use", + StreamingDataflowWorker.class.getName()); + + Preconditions.checkArgument( + !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT), + "%s cannot be main() class with beam_fn_api enabled", + StreamingDataflowWorker.class.getSimpleName()); + } + + private static ChannelCachingStubFactory createStubFactory( Review Comment: name createFanoutStubFactory since just used there and we don't want unconditional isolation (yet) in other case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org