arunpandianp commented on code in PR #35901: URL: https://github.com/apache/beam/pull/35901#discussion_r2284098043
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -183,6 +186,13 @@ public final class StreamingDataflowWorker { private final ActiveWorkRefresher activeWorkRefresher; private final StreamingWorkerStatusReporter workerStatusReporter; private final int numCommitThreads; + private Consumer<PrintWriter> getDataStatusProvider; + private Supplier<Long> currentActiveCommitBytesProvider; + private @Nullable ChannelzServlet channelzServlet; + private @Nullable ChannelCache channelCache; + private Supplier<Instant> clock; Review Comment: I don't think we need these to be variables at StreamingDataflowWorker level. The fields are currently computed inside the StreamingWorkerHarness factory methods and used by the same thread to set fields in statusPages. One option is to make `createFanOutStreamingEngineWorkerHarness` and `createSingleSourceWorkerHarness` return StreamingEngineWorkerHarnessFactoryOutput and use it to initialize StatusPages. ``` @AutoValue abstract class StreamingEngineWorkerHarnessFactoryOutput { private StreamingWorkerHarness streamingEngineWorkerHarness; private Consumer<PrintWriter> getDataStatusProvider; private Supplier<Long> currentActiveCommitBytesProvider; private @Nullable ChannelzServlet channelzServlet; private @Nullable ChannelCache channelCache; }``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -170,11 +172,12 @@ public final class StreamingDataflowWorker { "windmill_bounded_queue_executor_use_fair_monitor"; private final WindmillStateCache stateCache; - private final StreamingWorkerStatusPages statusPages; + private StreamingWorkerStatusPages statusPages; Review Comment: Wrap it with an AtomicReference? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +368,187 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + this.channelCache = createChannelCache(options, configFetcher); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + this.currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return fanOutStreamingEngineWorkerHarness; + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + this.channelCache = null; + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + // Stop the current status pages before switching the harness. + this.statusPages.stop(); + LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type."); + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { + LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH"); + LOG.debug("Shutting down to SingleSourceWorkerHarness"); + this.streamingWorkerHarness.get().shutdown(); + FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness = + createFanOutStreamingEngineWorkerHarness( + clientId, + options, + windmillStreamFactory, + streamingWorkScheduler, + getDataMetricTracker, + memoryMonitor, + dispatcherClient); + this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness); + streamingWorkerHarness.get().start(); Review Comment: can do `fanoutStreamingWorkerHarness.start()` instead ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +368,187 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + this.channelCache = createChannelCache(options, configFetcher); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + this.currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return fanOutStreamingEngineWorkerHarness; + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + this.channelCache = null; + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + // Stop the current status pages before switching the harness. + this.statusPages.stop(); + LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type."); + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { Review Comment: We won't need the second check with the early exit check. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -183,6 +186,13 @@ public final class StreamingDataflowWorker { private final ActiveWorkRefresher activeWorkRefresher; private final StreamingWorkerStatusReporter workerStatusReporter; private final int numCommitThreads; + private Consumer<PrintWriter> getDataStatusProvider; + private Supplier<Long> currentActiveCommitBytesProvider; + private @Nullable ChannelzServlet channelzServlet; + private @Nullable ChannelCache channelCache; + private Supplier<Instant> clock; Review Comment: can be final ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +368,187 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + this.channelCache = createChannelCache(options, configFetcher); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + this.currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return fanOutStreamingEngineWorkerHarness; + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + this.channelCache = null; + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + // Stop the current status pages before switching the harness. + this.statusPages.stop(); + LOG.debug("Stopped StreamingWorkerStatusPages before switching connectivity type."); + if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH) { + if (!(this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness)) { + LOG.info("Switching connectivity type from CLOUDPATH to DIRECTPATH"); + LOG.debug("Shutting down to SingleSourceWorkerHarness"); + this.streamingWorkerHarness.get().shutdown(); + FanOutStreamingEngineWorkerHarness fanoutStreamingWorkerHarness = + createFanOutStreamingEngineWorkerHarness( + clientId, + options, + windmillStreamFactory, + streamingWorkScheduler, + getDataMetricTracker, + memoryMonitor, + dispatcherClient); + this.streamingWorkerHarness.set(fanoutStreamingWorkerHarness); + streamingWorkerHarness.get().start(); + LOG.debug("Started FanOutStreamingEngineWorkerHarness"); + return; + } + } else if (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH) { + if (!(streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) { Review Comment: We won't need the second check with the early exit check. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -907,9 +1062,10 @@ void stop() { activeWorkRefresher.stop(); statusPages.stop(); running.set(false); - streamingWorkerHarness.shutdown(); + streamingWorkerHarness.get().shutdown(); memoryMonitor.shutdown(); workUnitExecutor.shutdown(); + harnessSwitchExecutor.shutdown(); Review Comment: shutdown harnessSwitchExecutor before shutting down streamingWorkerHarness? doing so will make sure a new streamingWorkerHarness is not set after shutting down the old one here. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -889,7 +1044,7 @@ public void start() { running.set(true); configFetcher.start(); memoryMonitor.start(); - streamingWorkerHarness.start(); + streamingWorkerHarness.get().start(); Review Comment: There is a race between `streamingWorkerHarness.get().start()` here and the harnessSwitchExecutor swapping a new `streamingWorkerHarness` and calling start on it. It'll end up calling `start()` on an already started `streamingWorkerHarness` and check fail. Registering the harness switch registerConfigObserver here after starting the existing `streamingWorkerHarness` should fix the race. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -394,6 +368,187 @@ private StreamingDataflowWorker( LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); } + private FanOutStreamingEngineWorkerHarness createFanOutStreamingEngineWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient) { + WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); + this.channelCache = createChannelCache(options, configFetcher); + this.getDataStatusProvider = getDataMetricTracker::printHtml; + FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = + FanOutStreamingEngineWorkerHarness.create( + createJobHeader(options, clientId), + GetWorkBudget.builder() + .setItems(chooseMaxBundlesOutstanding(options)) + .setBytes(MAX_GET_WORK_FETCH_BYTES) + .build(), + windmillStreamFactory, + (workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies) -> + computationStateCache + .get(processingContext.computationId()) + .ifPresent( + computationState -> { + memoryMonitor.waitForResources("GetWork"); + streamingWorkScheduler.scheduleWork( + computationState, + workItem, + serializedWorkItemSize, + watermarks, + processingContext, + getWorkStreamLatencies); + }), + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + GetWorkBudgetDistributors.distributeEvenly(), + Preconditions.checkNotNull(dispatcherClient), + commitWorkStream -> + StreamingEngineWorkCommitter.builder() + // Share the commitByteSemaphore across all created workCommitters. + .setCommitByteSemaphore(maxCommitByteSemaphore) + .setBackendWorkerToken(commitWorkStream.backendWorkerToken()) + .setOnCommitComplete(this::onCompleteCommit) + .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1)) + .setCommitWorkStreamFactory( + () -> CloseableStream.create(commitWorkStream, () -> {})) + .build(), + getDataMetricTracker); + this.currentActiveCommitBytesProvider = + fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet( + options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints); + return fanOutStreamingEngineWorkerHarness; + } + + private StreamingWorkerHarness createSingleSourceWorkerHarness( + long clientId, + DataflowWorkerHarnessOptions options, + WindmillServerStub windmillServer, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor) { + Windmill.GetWorkRequest request = + Windmill.GetWorkRequest.newBuilder() + .setClientId(clientId) + .setMaxItems(chooseMaxBundlesOutstanding(options)) + .setMaxBytes(MAX_GET_WORK_FETCH_BYTES) + .build(); + WindmillStreamPool<GetDataStream> getDataStreamPool = + WindmillStreamPool.create( + Math.max(1, options.getWindmillGetDataStreamCount()), + GET_DATA_STREAM_TIMEOUT, + windmillServer::getDataStream); + GetDataClient getDataClient = + new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool); + HeartbeatSender heartbeatSender = + createStreamingEngineHeartbeatSender( + options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle()); + WorkCommitter workCommitter = + StreamingEngineWorkCommitter.builder() + .setCommitWorkStreamFactory( + WindmillStreamPool.create( + numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream) + ::getCloseableStream) + .setCommitByteSemaphore(Commits.maxCommitByteSemaphore()) + .setNumCommitSenders(numCommitThreads) + .setOnCommitComplete(this::onCompleteCommit) + .build(); + GetWorkSender getWorkSender = + GetWorkSender.forStreamingEngine( + receiver -> windmillServer.getWorkStream(request, receiver)); + + this.getDataStatusProvider = getDataClient::printHtml; + this.currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes; + this.channelzServlet = + createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints); + this.channelCache = null; + + return SingleSourceWorkerHarness.builder() + .setStreamingWorkScheduler(streamingWorkScheduler) + .setWorkCommitter(workCommitter) + .setGetDataClient(getDataClient) + .setComputationStateFetcher(this.computationStateCache::get) + .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork")) + .setHeartbeatSender(heartbeatSender) + .setGetWorkSender(getWorkSender) + .build(); + } + + private void switchStreamingWorkerHarness( + ConnectivityType connectivityType, + long clientId, + DataflowWorkerHarnessOptions options, + GrpcWindmillStreamFactory windmillStreamFactory, + StreamingWorkScheduler streamingWorkScheduler, + ThrottlingGetDataMetricTracker getDataMetricTracker, + MemoryMonitor memoryMonitor, + GrpcDispatcherClient dispatcherClient, + WindmillServerStub windmillServer) { + // Stop the current status pages before switching the harness. + this.statusPages.stop(); Review Comment: We don't need to stop/start statusPages if the workerHarness didn't change. We could add an early exit to switchStreamingWorkerHarness ``` private void switchStreamingWorkerHarness(...) { if ((connectivityType == ConnectivityType.CONNECTIVITY_TYPE_DIRECTPATH && this.streamingWorkerHarness.get() instanceof FanOutStreamingEngineWorkerHarness) || (connectivityType == ConnectivityType.CONNECTIVITY_TYPE_CLOUDPATH && streamingWorkerHarness.get() instanceof SingleSourceWorkerHarness)) { return; } rest of logic ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java: ########## @@ -179,8 +184,13 @@ private void streamingEngineDispatchLoop( // Reconnect every now and again to enable better load balancing. // If at any point the server closes the stream, we will reconnect immediately; otherwise // we half-close the stream after some time and create a new one. - if (!stream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) { - stream.halfClose(); + if (getWorkStream != null) { + if (!getWorkStream.awaitTermination(GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) { + if (getWorkStream + != null) { // checking for null again to keep the static analyzer happy. + getWorkStream.halfClose(); Review Comment: `Preconditions.checkNotNull(getWorkStream).halfClose()` should also make the null checker happy. ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java: ########## @@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception { removeDynamicFields(result.get(1L))); } + @Test + public void testSwitchStreamingWorkerHarness() throws Exception { + if (!streamingEngine) { + return; + } + + List<ParallelInstruction> instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + // Start with CloudPath. + DataflowWorkerHarnessOptions options = + createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false"); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setOptions(options) + .setInstructions(instructions) + .publishCounters() + .build()); + + GrpcDispatcherClient mockDispatcherClient = mock(GrpcDispatcherClient.class); + + // FanOutStreamingEngineWorkerHarness creates + // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to + // successfully start. Mocking it here. + Channel mockChannel = mock(Channel.class); + ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall = + mock(ClientCall.class); + when(mockChannel.newCall( + eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), any())) + .thenReturn(mockClientCall); + when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking()) + .thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel)); + java.lang.reflect.Field dispatcherClientField = + StreamingDataflowWorker.class.getDeclaredField("dispatcherClient"); + dispatcherClientField.setAccessible(true); + dispatcherClientField.set(worker, mockDispatcherClient); Review Comment: Prefer fakes/mocks over using reflection directly. Here we can update FakeWindmillServer.java to setup fake metadata responses. ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java: ########## @@ -4058,6 +4067,106 @@ public void testStuckCommit() throws Exception { removeDynamicFields(result.get(1L))); } + @Test + public void testSwitchStreamingWorkerHarness() throws Exception { + if (!streamingEngine) { + return; + } + + List<ParallelInstruction> instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + // Start with CloudPath. + DataflowWorkerHarnessOptions options = + createTestingPipelineOptions("--isWindmillServiceDirectPathEnabled=false"); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setOptions(options) + .setInstructions(instructions) + .publishCounters() + .build()); + + GrpcDispatcherClient mockDispatcherClient = mock(GrpcDispatcherClient.class); + + // FanOutStreamingEngineWorkerHarness creates + // CloudWindmillMetadataServiceV1Alpha1Stub and expects the stream to + // successfully start. Mocking it here. + Channel mockChannel = mock(Channel.class); + ClientCall<WorkerMetadataRequest, WorkerMetadataResponse> mockClientCall = + mock(ClientCall.class); + when(mockChannel.newCall( + eq(CloudWindmillMetadataServiceV1Alpha1Grpc.getGetWorkerMetadataMethod()), any())) + .thenReturn(mockClientCall); + when(mockDispatcherClient.getWindmillMetadataServiceStubBlocking()) + .thenReturn(CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(mockChannel)); + java.lang.reflect.Field dispatcherClientField = + StreamingDataflowWorker.class.getDeclaredField("dispatcherClient"); + dispatcherClientField.setAccessible(true); + dispatcherClientField.set(worker, mockDispatcherClient); + + // Capture the config observer. + ArgumentCaptor<Consumer<StreamingGlobalConfig>> observerCaptor = + ArgumentCaptor.forClass(Consumer.class); + verify(mockGlobalConfigHandle, atLeastOnce()).registerConfigObserver(observerCaptor.capture()); + List<Consumer<StreamingGlobalConfig>> observers = observerCaptor.getAllValues(); + + worker.start(); + + // Use reflection to check the harness type. + java.lang.reflect.Field harnessField = + StreamingDataflowWorker.class.getDeclaredField("streamingWorkerHarness"); + harnessField.setAccessible(true); + AtomicReference<Object> harnessRef = (AtomicReference<Object>) harnessField.get(worker); Review Comment: You can expose a package private @VisibleForTesting method returning the StreamingWorkerHarness to StreamingDataflowWorker and use it to get the StreamingWorkerHarness. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java: ########## @@ -80,7 +80,7 @@ public final class StreamingWorkerStatusPages { private final @Nullable GrpcWindmillStreamFactory windmillStreamFactory; private final DebugCapture.@Nullable Manager debugCapture; private final @Nullable ChannelzServlet channelzServlet; - private final @Nullable ChannelCache channelCache; Review Comment: is this intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org