This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aa21e4a84b3 Integrate direct path with StreamingDataflowWorker code
path (#32778)
aa21e4a84b3 is described below
commit aa21e4a84b3594e9ddd6ddb436c9628cb6660552
Author: martin trieu <[email protected]>
AuthorDate: Tue Nov 26 02:51:57 2024 -0600
Integrate direct path with StreamingDataflowWorker code path (#32778)
---
.../options/DataflowStreamingPipelineOptions.java | 15 +-
.../dataflow/worker/StreamingDataflowWorker.java | 594 +++++++++++++--------
.../FanOutStreamingEngineWorkerHarness.java | 1 +
.../harness/SingleSourceWorkerHarness.java | 13 +-
.../streaming/harness/StreamingWorkerHarness.java | 3 +-
.../harness/StreamingWorkerStatusPages.java | 2 +-
.../harness/StreamingWorkerStatusReporter.java | 131 +++--
.../windmill/client/throttling/ThrottleTimer.java | 3 +-
.../client/throttling/ThrottledTimeTracker.java} | 14 +-
.../work/refresh/StreamPoolHeartbeatSender.java | 4 +-
.../worker/StreamingDataflowWorkerTest.java | 42 +-
.../harness/SingleSourceWorkerHarnessTest.java | 2 +
.../harness/StreamingWorkerStatusReporterTest.java | 53 +-
.../refresh/StreamPoolHeartbeatSenderTest.java | 6 +-
14 files changed, 538 insertions(+), 345 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 6a0208f1447..61c38dde2b4 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.options;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.joda.time.Duration;
@@ -219,10 +220,8 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillServiceStreamMaxBackoffMillis(int value);
- @Description(
- "If true, Dataflow streaming pipeline will be running in direct path
mode."
- + " VMs must have IPv6 enabled for this to work.")
- @Default.Boolean(false)
+ @Description("Enables direct path mode for streaming engine.")
+ @Default.InstanceFactory(EnableWindmillServiceDirectPathFactory.class)
boolean getIsWindmillServiceDirectPathEnabled();
void setIsWindmillServiceDirectPathEnabled(boolean
isWindmillServiceDirectPathEnabled);
@@ -300,4 +299,12 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
return streamingOptions.isEnableStreamingEngine() ? Integer.MAX_VALUE :
1;
}
}
+
+ /** EnableStreamingEngine defaults to false unless one of the experiment is
set. */
+ class EnableWindmillServiceDirectPathFactory implements
DefaultValueFactory<Boolean> {
+ @Override
+ public Boolean create(PipelineOptions options) {
+ return ExperimentalOptions.hasExperiment(options,
"enable_windmill_service_direct_path");
+ }
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6ce60283735..088a28e9b2d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
-import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MapTask;
import com.google.auto.value.AutoValue;
+import java.io.PrintWriter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -33,24 +33,28 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
import
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
@@ -59,12 +63,15 @@ import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorker
import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import
org.apache.beam.runners.dataflow.worker.windmill.ApplianceWindmillClient;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
+import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commit;
import org.apache.beam.runners.dataflow.worker.windmill.client.commits.Commits;
import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.CompleteCommit;
import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.StreamingApplianceWorkCommitter;
@@ -78,8 +85,16 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServ
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributors;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.StreamingApplianceFailureTracker;
@@ -89,6 +104,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkR
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ApplianceHeartbeatSender;
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.StreamPoolHeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.IdGenerators;
import org.apache.beam.sdk.fn.JvmInitializers;
@@ -98,18 +114,25 @@ import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Implements a Streaming Dataflow worker. */
+/**
+ * <b>For internal use only.</b>
+ *
+ * <p>Implements a Streaming Dataflow worker.
+ */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
+@Internal
public final class StreamingDataflowWorker {
/**
@@ -142,6 +165,10 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
+ private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api";
+ private static final String ENABLE_IPV6_EXPERIMENT =
"enable_private_ipv6_google_access";
+ private static final String
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT =
+ "streaming_engine_use_job_settings_for_heartbeat_pool";
private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
@@ -155,9 +182,8 @@ public final class StreamingDataflowWorker {
private final ReaderCache readerCache;
private final DataflowExecutionStateSampler sampler =
DataflowExecutionStateSampler.instance();
private final ActiveWorkRefresher activeWorkRefresher;
- private final WorkCommitter workCommitter;
private final StreamingWorkerStatusReporter workerStatusReporter;
- private final StreamingCounters streamingCounters;
+ private final int numCommitThreads;
private StreamingDataflowWorker(
WindmillServerStub windmillServer,
@@ -170,17 +196,17 @@ public final class StreamingDataflowWorker {
DataflowWorkerHarnessOptions options,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
- StreamingWorkerStatusReporter workerStatusReporter,
+ StreamingWorkerStatusReporterFactory
streamingWorkerStatusReporterFactory,
FailureTracker failureTracker,
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
GrpcWindmillStreamFactory windmillStreamFactory,
ScheduledExecutorService activeWorkRefreshExecutorFn,
- ConcurrentMap<String, StageInfo> stageInfoMap) {
+ ConcurrentMap<String, StageInfo> stageInfoMap,
+ @Nullable GrpcDispatcherClient dispatcherClient) {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
-
this.configFetcher = configFetcher;
this.computationStateCache = computationStateCache;
this.stateCache = windmillStateCache;
@@ -189,35 +215,13 @@ public final class StreamingDataflowWorker {
Duration.standardSeconds(options.getReaderCacheTimeoutSec()),
Executors.newCachedThreadPool());
this.options = options;
-
- boolean windmillServiceEnabled = options.isEnableStreamingEngine();
-
- int numCommitThreads = 1;
- if (windmillServiceEnabled && options.getWindmillServiceCommitThreads() >
0) {
- numCommitThreads = options.getWindmillServiceCommitThreads();
- }
-
- this.workCommitter =
- windmillServiceEnabled
- ? StreamingEngineWorkCommitter.builder()
- .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
- .setCommitWorkStreamFactory(
- WindmillStreamPool.create(
- numCommitThreads,
- COMMIT_STREAM_TIMEOUT,
- windmillServer::commitWorkStream)
- ::getCloseableStream)
- .setNumCommitSenders(numCommitThreads)
- .setOnCommitComplete(this::onCompleteCommit)
- .build()
- : StreamingApplianceWorkCommitter.create(
- windmillServer::commitWork, this::onCompleteCommit);
-
this.workUnitExecutor = workUnitExecutor;
-
- this.workerStatusReporter = workerStatusReporter;
- this.streamingCounters = streamingCounters;
this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor);
+ this.numCommitThreads =
+ options.isEnableStreamingEngine()
+ ? Math.max(options.getWindmillServiceCommitThreads(), 1)
+ : 1;
+
StreamingWorkScheduler streamingWorkScheduler =
StreamingWorkScheduler.create(
options,
@@ -234,107 +238,200 @@ public final class StreamingDataflowWorker {
ID_GENERATOR,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);
-
ThrottlingGetDataMetricTracker getDataMetricTracker =
new ThrottlingGetDataMetricTracker(memoryMonitor);
- WorkerStatusPages workerStatusPages =
- WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
- StreamingWorkerStatusPages.Builder statusPagesBuilder =
StreamingWorkerStatusPages.builder();
- int stuckCommitDurationMillis;
- GetDataClient getDataClient;
- HeartbeatSender heartbeatSender;
- if (windmillServiceEnabled) {
- WindmillStreamPool<GetDataStream> getDataStreamPool =
- WindmillStreamPool.create(
- Math.max(1, options.getWindmillGetDataStreamCount()),
- GET_DATA_STREAM_TIMEOUT,
- windmillServer::getDataStream);
- getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ // Status page members. Different implementations on whether the harness
is streaming engine
+ // direct path, streaming engine cloud path, or streaming appliance.
+ @Nullable ChannelzServlet channelzServlet = null;
+ Consumer<PrintWriter> getDataStatusProvider;
+ Supplier<Long> currentActiveCommitBytesProvider;
+ if (isDirectPathPipeline(options)) {
+ WeightedSemaphore<Commit> maxCommitByteSemaphore =
Commits.maxCommitByteSemaphore();
+ FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
+ FanOutStreamingEngineWorkerHarness.create(
+ createJobHeader(options, clientId),
+ GetWorkBudget.builder()
+ .setItems(chooseMaxBundlesOutstanding(options))
+ .setBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build(),
+ windmillStreamFactory,
+ (workItem, watermarks, processingContext,
getWorkStreamLatencies) ->
+ computationStateCache
+ .get(processingContext.computationId())
+ .ifPresent(
+ computationState -> {
+ memoryMonitor.waitForResources("GetWork");
+ streamingWorkScheduler.scheduleWork(
+ computationState,
+ workItem,
+ watermarks,
+ processingContext,
+ getWorkStreamLatencies);
+ }),
+ createFanOutStubFactory(options),
+ GetWorkBudgetDistributors.distributeEvenly(),
+ Preconditions.checkNotNull(dispatcherClient),
+ commitWorkStream ->
+ StreamingEngineWorkCommitter.builder()
+ // Share the commitByteSemaphore across all created
workCommitters.
+ .setCommitByteSemaphore(maxCommitByteSemaphore)
+
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
+ .setOnCommitComplete(this::onCompleteCommit)
+
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setCommitWorkStreamFactory(
+ () -> CloseableStream.create(commitWorkStream, () ->
{}))
+ .build(),
+ getDataMetricTracker);
+ getDataStatusProvider = getDataMetricTracker::printHtml;
+ currentActiveCommitBytesProvider =
+ fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
+ channelzServlet =
+ createChannelzServlet(
+ options,
fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
+ this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
+ } else {
+ // Non-direct path pipelines.
+ Windmill.GetWorkRequest request =
+ Windmill.GetWorkRequest.newBuilder()
+ .setClientId(clientId)
+ .setMaxItems(chooseMaxBundlesOutstanding(options))
+ .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
+ .build();
+ GetDataClient getDataClient;
+ HeartbeatSender heartbeatSender;
+ WorkCommitter workCommitter;
+ GetWorkSender getWorkSender;
+ if (options.isEnableStreamingEngine()) {
+ WindmillStreamPool<GetDataStream> getDataStreamPool =
+ WindmillStreamPool.create(
+ Math.max(1, options.getWindmillGetDataStreamCount()),
+ GET_DATA_STREAM_TIMEOUT,
+ windmillServer::getDataStream);
+ getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
heartbeatSender =
- StreamPoolHeartbeatSender.Create(
-
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
- ? separateHeartbeatPool(windmillServer)
- : getDataStreamPool);
-
+ createStreamingEngineHeartbeatSender(
+ options, windmillServer, getDataStreamPool,
configFetcher.getGlobalConfigHandle());
+ channelzServlet =
+ createChannelzServlet(options,
windmillServer::getWindmillServiceEndpoints);
+ workCommitter =
+ StreamingEngineWorkCommitter.builder()
+ .setCommitWorkStreamFactory(
+ WindmillStreamPool.create(
+ numCommitThreads,
+ COMMIT_STREAM_TIMEOUT,
+ windmillServer::commitWorkStream)
+ ::getCloseableStream)
+ .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
+ .setNumCommitSenders(numCommitThreads)
+ .setOnCommitComplete(this::onCompleteCommit)
+ .build();
+ getWorkSender =
+ GetWorkSender.forStreamingEngine(
+ receiver -> windmillServer.getWorkStream(request, receiver));
} else {
- heartbeatSender =
- StreamPoolHeartbeatSender.Create(
- separateHeartbeatPool(windmillServer),
- getDataStreamPool,
- configFetcher.getGlobalConfigHandle());
+ getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
+ heartbeatSender = new
ApplianceHeartbeatSender(windmillServer::getData);
+ workCommitter =
+ StreamingApplianceWorkCommitter.create(
+ windmillServer::commitWork, this::onCompleteCommit);
+ getWorkSender = GetWorkSender.forAppliance(() ->
windmillServer.getWork(request));
}
- stuckCommitDurationMillis =
- options.getStuckCommitDurationMillis() > 0 ?
options.getStuckCommitDurationMillis() : 0;
- statusPagesBuilder
- .setDebugCapture(
- new DebugCapture.Manager(options,
workerStatusPages.getDebugCapturePages()))
- .setChannelzServlet(
- new ChannelzServlet(
- CHANNELZ_PATH, options,
windmillServer::getWindmillServiceEndpoints))
- .setWindmillStreamFactory(windmillStreamFactory);
- } else {
- getDataClient = new ApplianceGetDataClient(windmillServer,
getDataMetricTracker);
- heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
- stuckCommitDurationMillis = 0;
+ getDataStatusProvider = getDataClient::printHtml;
+ currentActiveCommitBytesProvider =
workCommitter::currentActiveCommitBytes;
+
+ this.streamingWorkerHarness =
+ SingleSourceWorkerHarness.builder()
+ .setStreamingWorkScheduler(streamingWorkScheduler)
+ .setWorkCommitter(workCommitter)
+ .setGetDataClient(getDataClient)
+ .setComputationStateFetcher(this.computationStateCache::get)
+ .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
+ .setHeartbeatSender(heartbeatSender)
+ .setThrottledTimeTracker(windmillServer::getAndResetThrottleTime)
+ .setGetWorkSender(getWorkSender)
+ .build();
}
+ this.workerStatusReporter =
+
streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness);
this.activeWorkRefresher =
new ActiveWorkRefresher(
clock,
options.getActiveWorkRefreshPeriodMillis(),
- stuckCommitDurationMillis,
+ options.isEnableStreamingEngine()
+ ? Math.max(options.getStuckCommitDurationMillis(), 0)
+ : 0,
computationStateCache::getAllPresentComputations,
sampler,
activeWorkRefreshExecutorFn,
getDataMetricTracker::trackHeartbeats);
this.statusPages =
- statusPagesBuilder
+ createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
.setClock(clock)
.setClientId(clientId)
.setIsRunning(running)
- .setStatusPages(workerStatusPages)
.setStateCache(stateCache)
.setComputationStateCache(this.computationStateCache)
-
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
- .setGetDataStatusProvider(getDataClient::printHtml)
.setWorkUnitExecutor(workUnitExecutor)
.setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
+ .setChannelzServlet(channelzServlet)
+ .setGetDataStatusProvider(getDataStatusProvider)
+ .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
.build();
- Windmill.GetWorkRequest request =
- Windmill.GetWorkRequest.newBuilder()
- .setClientId(clientId)
- .setMaxItems(chooseMaximumBundlesOutstanding())
- .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
- .build();
-
- this.streamingWorkerHarness =
- SingleSourceWorkerHarness.builder()
- .setStreamingWorkScheduler(streamingWorkScheduler)
- .setWorkCommitter(workCommitter)
- .setGetDataClient(getDataClient)
- .setComputationStateFetcher(this.computationStateCache::get)
- .setWaitForResources(() ->
memoryMonitor.waitForResources("GetWork"))
- .setHeartbeatSender(heartbeatSender)
- .setGetWorkSender(
- windmillServiceEnabled
- ? GetWorkSender.forStreamingEngine(
- receiver -> windmillServer.getWorkStream(request,
receiver))
- : GetWorkSender.forAppliance(() ->
windmillServer.getWork(request)))
- .build();
-
- LOG.debug("windmillServiceEnabled: {}", windmillServiceEnabled);
+ LOG.debug("isDirectPathEnabled: {}",
options.getIsWindmillServiceDirectPathEnabled());
+ LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
LOG.debug("WindmillServiceEndpoint: {}",
options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}
- private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
- WindmillServerStub windmillServer) {
- return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
+ private static StreamingWorkerStatusPages.Builder createStatusPageBuilder(
+ DataflowWorkerHarnessOptions options,
+ GrpcWindmillStreamFactory windmillStreamFactory,
+ MemoryMonitor memoryMonitor) {
+ WorkerStatusPages workerStatusPages =
+ WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+ StreamingWorkerStatusPages.Builder streamingStatusPages =
+ StreamingWorkerStatusPages.builder().setStatusPages(workerStatusPages);
+
+ return options.isEnableStreamingEngine()
+ ? streamingStatusPages
+ .setDebugCapture(
+ new DebugCapture.Manager(options,
workerStatusPages.getDebugCapturePages()))
+ .setWindmillStreamFactory(windmillStreamFactory)
+ : streamingStatusPages;
+ }
+
+ private static ChannelzServlet createChannelzServlet(
+ DataflowWorkerHarnessOptions options,
+ Supplier<ImmutableSet<HostAndPort>> windmillEndpointProvider) {
+ return new ChannelzServlet(CHANNELZ_PATH, options,
windmillEndpointProvider);
+ }
+
+ private static HeartbeatSender createStreamingEngineHeartbeatSender(
+ DataflowWorkerHarnessOptions options,
+ WindmillServerStub windmillClient,
+ WindmillStreamPool<GetDataStream> getDataStreamPool,
+ StreamingGlobalConfigHandle globalConfigHandle) {
+ // Experiment gates the logic till backend changes are rollback safe
+ if (!DataflowRunner.hasExperiment(
+ options,
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT)
+ || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ return StreamPoolHeartbeatSender.create(
+ Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
+ ? WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT,
windmillClient::getDataStream)
+ : getDataStreamPool);
+
+ } else {
+ return StreamPoolHeartbeatSender.create(
+ WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT,
windmillClient::getDataStream),
+ getDataStreamPool,
+ globalConfigHandle);
+ }
}
public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions options) {
@@ -387,17 +484,21 @@ public final class StreamingDataflowWorker {
failureTracker,
() -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()),
clock);
- StreamingWorkerStatusReporter workerStatusReporter =
- StreamingWorkerStatusReporter.create(
- dataflowServiceClient,
- windmillServer::getAndResetThrottleTime,
- stageInfo::values,
- failureTracker,
- streamingCounters,
- memoryMonitor,
- workExecutor,
- options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
- options.getPerWorkerMetricsUpdateReportingPeriodMillis());
+ StreamingWorkerStatusReporterFactory workerStatusReporterFactory =
+ throttleTimeSupplier ->
+ StreamingWorkerStatusReporter.builder()
+ .setDataflowServiceClient(dataflowServiceClient)
+ .setWindmillQuotaThrottleTime(throttleTimeSupplier)
+ .setAllStageInfo(stageInfo::values)
+ .setFailureTracker(failureTracker)
+ .setStreamingCounters(streamingCounters)
+ .setMemoryMonitor(memoryMonitor)
+ .setWorkExecutor(workExecutor)
+ .setWindmillHarnessUpdateReportingPeriodMillis(
+
options.getWindmillHarnessUpdateReportingPeriod().getMillis())
+ .setPerWorkerMetricsUpdateReportingPeriodMillis(
+ options.getPerWorkerMetricsUpdateReportingPeriodMillis())
+ .build();
return new StreamingDataflowWorker(
windmillServer,
@@ -410,7 +511,7 @@ public final class StreamingDataflowWorker {
options,
new HotKeyLogger(),
clock,
- workerStatusReporter,
+ workerStatusReporterFactory,
failureTracker,
workFailureProcessor,
streamingCounters,
@@ -418,7 +519,8 @@ public final class StreamingDataflowWorker {
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()),
- stageInfo);
+ stageInfo,
+
configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient());
}
/**
@@ -433,53 +535,121 @@ public final class StreamingDataflowWorker {
WorkUnitClient dataflowServiceClient,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache>
computationStateCacheFactory) {
- ComputationConfig.Fetcher configFetcher;
- WindmillServerStub windmillServer;
- ComputationStateCache computationStateCache;
- GrpcWindmillStreamFactory windmillStreamFactory;
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
- configFetcher =
+ ComputationConfig.Fetcher configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient);
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
- computationStateCache =
computationStateCacheFactory.apply(configFetcher);
- windmillStreamFactory =
+ ComputationStateCache computationStateCache =
+ computationStateCacheFactory.apply(configFetcher);
+ GrpcWindmillStreamFactory windmillStreamFactory =
windmillStreamFactoryBuilder
.setProcessHeartbeatResponses(
new
WorkHeartbeatResponseProcessor(computationStateCache::get))
.setHealthCheckIntervalMillis(
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
.build();
- windmillServer = GrpcWindmillServer.create(options,
windmillStreamFactory, dispatcherClient);
- } else {
- if (options.getWindmillServiceEndpoint() != null
- || options.getLocalWindmillHostport().startsWith("grpc:")) {
- windmillStreamFactory =
- windmillStreamFactoryBuilder
- .setHealthCheckIntervalMillis(
-
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
- .build();
- windmillServer =
- GrpcWindmillServer.create(
- options,
- windmillStreamFactory,
- GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options)));
- } else {
- windmillStreamFactory = windmillStreamFactoryBuilder.build();
- windmillServer = new
JniWindmillApplianceServer(options.getLocalWindmillHostport());
+ return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+ .setWindmillDispatcherClient(dispatcherClient)
+ .setConfigFetcher(configFetcher)
+ .setComputationStateCache(computationStateCache)
+ .setWindmillStreamFactory(windmillStreamFactory)
+ .setWindmillServer(
+ GrpcWindmillServer.create(options, windmillStreamFactory,
dispatcherClient))
+ .build();
+ }
+
+ // Build with local Windmill client.
+ if (options.getWindmillServiceEndpoint() != null
+ || options.getLocalWindmillHostport().startsWith("grpc:")) {
+ GrpcDispatcherClient dispatcherClient =
+ GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
+ GrpcWindmillStreamFactory windmillStreamFactory =
+ windmillStreamFactoryBuilder
+ .setHealthCheckIntervalMillis(
+ options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+ .build();
+ GrpcWindmillServer windmillServer =
+ GrpcWindmillServer.create(options, windmillStreamFactory,
dispatcherClient);
+ ComputationConfig.Fetcher configFetcher =
+ createApplianceComputationConfigFetcher(windmillServer);
+ return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+ .setWindmillDispatcherClient(dispatcherClient)
+ .setWindmillServer(windmillServer)
+ .setWindmillStreamFactory(windmillStreamFactory)
+ .setConfigFetcher(configFetcher)
+
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
+ .build();
+ }
+
+ WindmillServerStub windmillServer =
+ new JniWindmillApplianceServer(options.getLocalWindmillHostport());
+ ComputationConfig.Fetcher configFetcher =
+ createApplianceComputationConfigFetcher(windmillServer);
+ return ConfigFetcherComputationStateCacheAndWindmillClient.builder()
+ .setWindmillStreamFactory(windmillStreamFactoryBuilder.build())
+ .setWindmillServer(windmillServer)
+ .setConfigFetcher(configFetcher)
+
.setComputationStateCache(computationStateCacheFactory.apply(configFetcher))
+ .build();
+ }
+
+ private static StreamingApplianceComputationConfigFetcher
createApplianceComputationConfigFetcher(
+ ApplianceWindmillClient windmillClient) {
+ return new StreamingApplianceComputationConfigFetcher(
+ windmillClient::getConfig,
+ new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
+ }
+
+ private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions
options) {
+ if (options.isEnableStreamingEngine() &&
options.getIsWindmillServiceDirectPathEnabled()) {
+ boolean isIpV6Enabled =
+ Optional.ofNullable(options.getDataflowServiceOptions())
+ .map(serviceOptions ->
serviceOptions.contains(ENABLE_IPV6_EXPERIMENT))
+ .orElse(false);
+
+ if (isIpV6Enabled) {
+ return true;
}
- configFetcher =
- new StreamingApplianceComputationConfigFetcher(
- windmillServer::getConfig,
- new
FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
- computationStateCache =
computationStateCacheFactory.apply(configFetcher);
+ LOG.warn(
+ "DirectPath is currently only supported with IPv6 networking stack.
This requires setting "
+ + "\"enable_private_ipv6_google_access\" in experimental
pipeline options. "
+ + "For information on how to set experimental pipeline options
see "
+ +
"https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#experimental.
"
+ + "Defaulting to CloudPath.");
}
- return ConfigFetcherComputationStateCacheAndWindmillClient.create(
- configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
+ return false;
+ }
+
+ private static void validateWorkerOptions(DataflowWorkerHarnessOptions
options) {
+ Preconditions.checkArgument(
+ options.isStreaming(),
+ "%s instantiated with options indicating batch use",
+ StreamingDataflowWorker.class.getName());
+
+ Preconditions.checkArgument(
+ !DataflowRunner.hasExperiment(options, BEAM_FN_API_EXPERIMENT),
+ "%s cannot be main() class with beam_fn_api enabled",
+ StreamingDataflowWorker.class.getSimpleName());
+ }
+
+ private static ChannelCachingStubFactory createFanOutStubFactory(
+ DataflowWorkerHarnessOptions workerOptions) {
+ return ChannelCachingRemoteStubFactory.create(
+ workerOptions.getGcpCredential(),
+ ChannelCache.create(
+ serviceAddress ->
+ // IsolationChannel will create and manage separate RPC
channels to the same
+ // serviceAddress.
+ IsolationChannel.create(
+ () ->
+ remoteChannel(
+ serviceAddress,
+
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
}
@VisibleForTesting
@@ -495,7 +665,9 @@ public final class StreamingDataflowWorker {
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
StreamingGlobalConfigHandleImpl globalConfigHandle,
- int localRetryTimeoutMs) {
+ int localRetryTimeoutMs,
+ StreamingCounters streamingCounters,
+ WindmillStubFactoryFactory stubFactory) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache =
@@ -538,7 +710,6 @@ public final class StreamingDataflowWorker {
stateNameMap,
stateCache.forComputation(mapTask.getStageName())));
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
- StreamingCounters streamingCounters = StreamingCounters.create();
FailureTracker failureTracker =
options.isEnableStreamingEngine()
? StreamingEngineFailureTracker.create(
@@ -554,19 +725,23 @@ public final class StreamingDataflowWorker {
() -> Optional.ofNullable(memoryMonitor.tryToDumpHeap()),
clock,
localRetryTimeoutMs);
- StreamingWorkerStatusReporter workerStatusReporter =
- StreamingWorkerStatusReporter.forTesting(
- publishCounters,
- workUnitClient,
- windmillServer::getAndResetThrottleTime,
- stageInfo::values,
- failureTracker,
- streamingCounters,
- memoryMonitor,
- workExecutor,
- executorSupplier,
- options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
- options.getPerWorkerMetricsUpdateReportingPeriodMillis());
+ StreamingWorkerStatusReporterFactory workerStatusReporterFactory =
+ throttleTimeSupplier ->
+ StreamingWorkerStatusReporter.builder()
+ .setPublishCounters(publishCounters)
+ .setDataflowServiceClient(workUnitClient)
+ .setWindmillQuotaThrottleTime(throttleTimeSupplier)
+ .setAllStageInfo(stageInfo::values)
+ .setFailureTracker(failureTracker)
+ .setStreamingCounters(streamingCounters)
+ .setMemoryMonitor(memoryMonitor)
+ .setWorkExecutor(workExecutor)
+ .setExecutorFactory(executorSupplier)
+ .setWindmillHarnessUpdateReportingPeriodMillis(
+
options.getWindmillHarnessUpdateReportingPeriod().getMillis())
+ .setPerWorkerMetricsUpdateReportingPeriodMillis(
+ options.getPerWorkerMetricsUpdateReportingPeriodMillis())
+ .build();
GrpcWindmillStreamFactory.Builder windmillStreamFactory =
createGrpcwindmillStreamFactoryBuilder(options, 1)
@@ -584,7 +759,7 @@ public final class StreamingDataflowWorker {
options,
hotKeyLogger,
clock,
- workerStatusReporter,
+ workerStatusReporterFactory,
failureTracker,
workFailureProcessor,
streamingCounters,
@@ -596,7 +771,8 @@ public final class StreamingDataflowWorker {
.build()
: windmillStreamFactory.build(),
executorSupplier.apply("RefreshWork"),
- stageInfo);
+ stageInfo,
+ GrpcDispatcherClient.create(options, stubFactory));
}
private static GrpcWindmillStreamFactory.Builder
createGrpcwindmillStreamFactoryBuilder(
@@ -605,13 +781,7 @@ public final class StreamingDataflowWorker {
!options.isEnableStreamingEngine() &&
options.getLocalWindmillHostport() != null
? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
:
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
- return GrpcWindmillStreamFactory.of(
- JobHeader.newBuilder()
- .setJobId(options.getJobId())
- .setProjectId(options.getProject())
- .setWorkerId(options.getWorkerId())
- .setClientId(clientId)
- .build())
+ return GrpcWindmillStreamFactory.of(createJobHeader(options, clientId))
.setWindmillMessagesBetweenIsReadyChecks(options.getWindmillMessagesBetweenIsReadyChecks())
.setMaxBackOffSupplier(() -> maxBackoff)
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
@@ -622,6 +792,15 @@ public final class StreamingDataflowWorker {
options,
"streaming_engine_disable_new_heartbeat_requests"));
}
+ private static JobHeader createJobHeader(DataflowWorkerHarnessOptions
options, long clientId) {
+ return JobHeader.newBuilder()
+ .setJobId(options.getJobId())
+ .setProjectId(options.getProject())
+ .setWorkerId(options.getWorkerId())
+ .setClientId(clientId)
+ .build();
+ }
+
private static BoundedQueueExecutor
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
return new BoundedQueueExecutor(
chooseMaxThreads(options),
@@ -640,15 +819,7 @@ public final class StreamingDataflowWorker {
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
StreamingDataflowWorker.class, DataflowWorkerHarnessOptions.class);
DataflowWorkerHarnessHelper.configureLogging(options);
- checkArgument(
- options.isStreaming(),
- "%s instantiated with options indicating batch use",
- StreamingDataflowWorker.class.getName());
-
- checkArgument(
- !DataflowRunner.hasExperiment(options, "beam_fn_api"),
- "%s cannot be main() class with beam_fn_api enabled",
- StreamingDataflowWorker.class.getSimpleName());
+ validateWorkerOptions(options);
CoderTranslation.verifyModelCodersRegistered();
@@ -705,21 +876,6 @@ public final class StreamingDataflowWorker {
workerStatusReporter.reportPeriodicWorkerUpdates();
}
- private int chooseMaximumNumberOfThreads() {
- if (options.getNumberOfWorkerHarnessThreads() != 0) {
- return options.getNumberOfWorkerHarnessThreads();
- }
- return MAX_PROCESSING_THREADS;
- }
-
- private int chooseMaximumBundlesOutstanding() {
- int maxBundles = options.getMaxBundlesFromWindmillOutstanding();
- if (maxBundles > 0) {
- return maxBundles;
- }
- return chooseMaximumNumberOfThreads() + 100;
- }
-
@VisibleForTesting
public boolean workExecutorIsEmpty() {
return workUnitExecutor.executorQueueIsEmpty();
@@ -727,7 +883,7 @@ public final class StreamingDataflowWorker {
@VisibleForTesting
int numCommitThreads() {
- return workCommitter.parallelism();
+ return numCommitThreads;
}
@VisibleForTesting
@@ -740,7 +896,6 @@ public final class StreamingDataflowWorker {
return computationStateCache;
}
- @SuppressWarnings("FutureReturnValueIgnored")
public void start() {
running.set(true);
configFetcher.start();
@@ -791,27 +946,17 @@ public final class StreamingDataflowWorker {
completeCommit.shardedKey(), completeCommit.workId()));
}
- @VisibleForTesting
- public Iterable<CounterUpdate> buildCounters() {
- return Iterables.concat(
- streamingCounters
- .pendingDeltaCounters()
-
.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE),
- streamingCounters
- .pendingCumulativeCounters()
- .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
+ @FunctionalInterface
+ private interface StreamingWorkerStatusReporterFactory {
+ StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker
throttledTimeTracker);
}
@AutoValue
abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {
- private static ConfigFetcherComputationStateCacheAndWindmillClient create(
- ComputationConfig.Fetcher configFetcher,
- ComputationStateCache computationStateCache,
- WindmillServerStub windmillServer,
- GrpcWindmillStreamFactory windmillStreamFactory) {
- return new
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
- configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
+ private static Builder builder() {
+ return new
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient
+ .Builder();
}
abstract ComputationConfig.Fetcher configFetcher();
@@ -821,6 +966,23 @@ public final class StreamingDataflowWorker {
abstract WindmillServerStub windmillServer();
abstract GrpcWindmillStreamFactory windmillStreamFactory();
+
+ abstract @Nullable GrpcDispatcherClient windmillDispatcherClient();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setConfigFetcher(ComputationConfig.Fetcher value);
+
+ abstract Builder setComputationStateCache(ComputationStateCache value);
+
+ abstract Builder setWindmillServer(WindmillServerStub value);
+
+ abstract Builder setWindmillStreamFactory(GrpcWindmillStreamFactory
value);
+
+ abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value);
+
+ abstract ConfigFetcherComputationStateCacheAndWindmillClient build();
+ }
}
/**
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
index 1ef1691b081..4c73e8c7f61 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
@@ -371,6 +371,7 @@ public final class FanOutStreamingEngineWorkerHarness
implements StreamingWorker
}
/** Add up all the throttle times of all streams including
GetWorkerMetadataStream. */
+ @Override
public long getAndResetThrottleTime() {
return backends.get().windmillStreams().values().stream()
.map(WindmillStreamSender::getAndResetThrottleTime)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index 9716b834cac..65203288e16 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -37,6 +37,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub.Windm
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
@@ -66,6 +67,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
private final Function<String, Optional<ComputationState>>
computationStateFetcher;
private final ExecutorService workProviderExecutor;
private final GetWorkSender getWorkSender;
+ private final ThrottledTimeTracker throttledTimeTracker;
SingleSourceWorkerHarness(
WorkCommitter workCommitter,
@@ -74,7 +76,8 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
StreamingWorkScheduler streamingWorkScheduler,
Runnable waitForResources,
Function<String, Optional<ComputationState>> computationStateFetcher,
- GetWorkSender getWorkSender) {
+ GetWorkSender getWorkSender,
+ ThrottledTimeTracker throttledTimeTracker) {
this.workCommitter = workCommitter;
this.getDataClient = getDataClient;
this.heartbeatSender = heartbeatSender;
@@ -90,6 +93,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
.build());
this.isRunning = new AtomicBoolean(false);
this.getWorkSender = getWorkSender;
+ this.throttledTimeTracker = throttledTimeTracker;
}
public static SingleSourceWorkerHarness.Builder builder() {
@@ -144,6 +148,11 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
workCommitter.stop();
}
+ @Override
+ public long getAndResetThrottleTime() {
+ return throttledTimeTracker.getAndResetThrottleTime();
+ }
+
private void streamingEngineDispatchLoop(
Function<WorkItemReceiver, WindmillStream.GetWorkStream>
getWorkStreamFactory) {
while (isRunning.get()) {
@@ -254,6 +263,8 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
Builder setGetWorkSender(GetWorkSender getWorkSender);
+ Builder setThrottledTimeTracker(ThrottledTimeTracker throttledTimeTracker);
+
SingleSourceWorkerHarness build();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
index c1b4570e226..731a5a4b1b5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
@@ -17,11 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming.harness;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import org.apache.beam.sdk.annotations.Internal;
/** Provides an interface to start streaming worker processing. */
@Internal
-public interface StreamingWorkerHarness {
+public interface StreamingWorkerHarness extends ThrottledTimeTracker {
void start();
void shutdown();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
index 6981312eff1..ddfc6809231 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
@@ -258,7 +258,7 @@ public final class StreamingWorkerStatusPages {
Builder setDebugCapture(DebugCapture.Manager debugCapture);
- Builder setChannelzServlet(ChannelzServlet channelzServlet);
+ Builder setChannelzServlet(@Nullable ChannelzServlet channelzServlet);
Builder setStateCache(WindmillStateCache stateCache);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
index ba77d8e1ce2..3557f0d193c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java
@@ -27,6 +27,7 @@ import
com.google.api.services.dataflow.model.StreamingScalingReportResponse;
import com.google.api.services.dataflow.model.WorkItemStatus;
import com.google.api.services.dataflow.model.WorkerMessage;
import com.google.api.services.dataflow.model.WorkerMessageResponse;
+import com.google.auto.value.AutoBuilder;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
@@ -51,6 +52,7 @@ import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
import org.apache.beam.sdk.annotations.Internal;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
@@ -78,7 +80,7 @@ public final class StreamingWorkerStatusReporter {
private final int initialMaxThreadCount;
private final int initialMaxBundlesOutstanding;
private final WorkUnitClient dataflowServiceClient;
- private final Supplier<Long> windmillQuotaThrottleTime;
+ private final ThrottledTimeTracker windmillQuotaThrottleTime;
private final Supplier<Collection<StageInfo>> allStageInfo;
private final FailureTracker failureTracker;
private final StreamingCounters streamingCounters;
@@ -97,10 +99,10 @@ public final class StreamingWorkerStatusReporter {
// Used to track the number of WorkerMessages that have been sent without
PerWorkerMetrics.
private final AtomicLong workerMessagesIndex;
- private StreamingWorkerStatusReporter(
+ StreamingWorkerStatusReporter(
boolean publishCounters,
WorkUnitClient dataflowServiceClient,
- Supplier<Long> windmillQuotaThrottleTime,
+ ThrottledTimeTracker windmillQuotaThrottleTime,
Supplier<Collection<StageInfo>> allStageInfo,
FailureTracker failureTracker,
StreamingCounters streamingCounters,
@@ -131,57 +133,13 @@ public final class StreamingWorkerStatusReporter {
this.workerMessagesIndex = new AtomicLong();
}
- public static StreamingWorkerStatusReporter create(
- WorkUnitClient workUnitClient,
- Supplier<Long> windmillQuotaThrottleTime,
- Supplier<Collection<StageInfo>> allStageInfo,
- FailureTracker failureTracker,
- StreamingCounters streamingCounters,
- MemoryMonitor memoryMonitor,
- BoundedQueueExecutor workExecutor,
- long windmillHarnessUpdateReportingPeriodMillis,
- long perWorkerMetricsUpdateReportingPeriodMillis) {
- return new StreamingWorkerStatusReporter(
- /* publishCounters= */ true,
- workUnitClient,
- windmillQuotaThrottleTime,
- allStageInfo,
- failureTracker,
- streamingCounters,
- memoryMonitor,
- workExecutor,
- threadName ->
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat(threadName).build()),
- windmillHarnessUpdateReportingPeriodMillis,
- perWorkerMetricsUpdateReportingPeriodMillis);
- }
-
- @VisibleForTesting
- public static StreamingWorkerStatusReporter forTesting(
- boolean publishCounters,
- WorkUnitClient workUnitClient,
- Supplier<Long> windmillQuotaThrottleTime,
- Supplier<Collection<StageInfo>> allStageInfo,
- FailureTracker failureTracker,
- StreamingCounters streamingCounters,
- MemoryMonitor memoryMonitor,
- BoundedQueueExecutor workExecutor,
- Function<String, ScheduledExecutorService> executorFactory,
- long windmillHarnessUpdateReportingPeriodMillis,
- long perWorkerMetricsUpdateReportingPeriodMillis) {
- return new StreamingWorkerStatusReporter(
- publishCounters,
- workUnitClient,
- windmillQuotaThrottleTime,
- allStageInfo,
- failureTracker,
- streamingCounters,
- memoryMonitor,
- workExecutor,
- executorFactory,
- windmillHarnessUpdateReportingPeriodMillis,
- perWorkerMetricsUpdateReportingPeriodMillis);
+ public static Builder builder() {
+ return new AutoBuilder_StreamingWorkerStatusReporter_Builder()
+ .setPublishCounters(true)
+ .setExecutorFactory(
+ threadName ->
+ Executors.newSingleThreadScheduledExecutor(
+ new
ThreadFactoryBuilder().setNameFormat(threadName).build()));
}
/**
@@ -228,6 +186,22 @@ public final class StreamingWorkerStatusReporter {
}
}
+ // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment
with the
+ // WorkerMessages RPC schedule. The desired reporting period
+ // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest
multiple
+ // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
+ private static long getPerWorkerMetricsUpdateFrequency(
+ long windmillHarnessUpdateReportingPeriodMillis,
+ long perWorkerMetricsUpdateReportingPeriodMillis) {
+ if (windmillHarnessUpdateReportingPeriodMillis == 0) {
+ return 0;
+ }
+ return LongMath.divide(
+ perWorkerMetricsUpdateReportingPeriodMillis,
+ windmillHarnessUpdateReportingPeriodMillis,
+ RoundingMode.CEILING);
+ }
+
@SuppressWarnings("FutureReturnValueIgnored")
public void start() {
reportHarnessStartup();
@@ -276,27 +250,13 @@ public final class StreamingWorkerStatusReporter {
}
}
- // Calculates the PerWorkerMetrics reporting frequency, ensuring alignment
with the
- // WorkerMessages RPC schedule. The desired reporting period
- // (perWorkerMetricsUpdateReportingPeriodMillis) is adjusted to the nearest
multiple
- // of the RPC interval (windmillHarnessUpdateReportingPeriodMillis).
- private static long getPerWorkerMetricsUpdateFrequency(
- long windmillHarnessUpdateReportingPeriodMillis,
- long perWorkerMetricsUpdateReportingPeriodMillis) {
- if (windmillHarnessUpdateReportingPeriodMillis == 0) {
- return 0;
- }
- return LongMath.divide(
- perWorkerMetricsUpdateReportingPeriodMillis,
- windmillHarnessUpdateReportingPeriodMillis,
- RoundingMode.CEILING);
- }
-
/** Sends counter updates to Dataflow backend. */
private void sendWorkerUpdatesToDataflowService(
CounterSet deltaCounters, CounterSet cumulativeCounters) throws
IOException {
// Throttle time is tracked by the windmillServer but is reported to DFE
here.
-
streamingCounters.windmillQuotaThrottling().addValue(windmillQuotaThrottleTime.get());
+ streamingCounters
+ .windmillQuotaThrottling()
+ .addValue(windmillQuotaThrottleTime.getAndResetThrottleTime());
if (memoryMonitor.isThrashing()) {
streamingCounters.memoryThrashing().addValue(1);
}
@@ -496,4 +456,33 @@ public final class StreamingWorkerStatusReporter {
.maxOutstandingBundles()
.addValue((long) workExecutor.maximumElementsOutstanding());
}
+
+ @AutoBuilder
+ public interface Builder {
+ Builder setPublishCounters(boolean publishCounters);
+
+ Builder setDataflowServiceClient(WorkUnitClient dataflowServiceClient);
+
+ Builder setWindmillQuotaThrottleTime(ThrottledTimeTracker
windmillQuotaThrottledTimeTracker);
+
+ Builder setAllStageInfo(Supplier<Collection<StageInfo>> allStageInfo);
+
+ Builder setFailureTracker(FailureTracker failureTracker);
+
+ Builder setStreamingCounters(StreamingCounters streamingCounters);
+
+ Builder setMemoryMonitor(MemoryMonitor memoryMonitor);
+
+ Builder setWorkExecutor(BoundedQueueExecutor workExecutor);
+
+ Builder setExecutorFactory(Function<String, ScheduledExecutorService>
executorFactory);
+
+ Builder setWindmillHarnessUpdateReportingPeriodMillis(
+ long windmillHarnessUpdateReportingPeriodMillis);
+
+ Builder setPerWorkerMetricsUpdateReportingPeriodMillis(
+ long perWorkerMetricsUpdateReportingPeriodMillis);
+
+ StreamingWorkerStatusReporter build();
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
index f660112721b..fdcb0339d23 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottleTimer.java
@@ -25,7 +25,7 @@ import org.joda.time.Instant;
* CommitWork are both blocked for x, totalTime will be 2x. However, if 2
GetWork streams are both
* blocked for x totalTime will be x. All methods are thread safe.
*/
-public final class ThrottleTimer {
+public final class ThrottleTimer implements ThrottledTimeTracker {
// This is -1 if not currently being throttled or the time in
// milliseconds when throttling for this type started.
private long startTime = -1;
@@ -56,6 +56,7 @@ public final class ThrottleTimer {
}
/** Returns the combined total of all throttle times and resets those times
to 0. */
+ @Override
public synchronized long getAndResetThrottleTime() {
if (throttled()) {
stop();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
similarity index 69%
copy from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
copy to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
index c1b4570e226..9bb8fb0a7b5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/throttling/ThrottledTimeTracker.java
@@ -15,14 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker.streaming.harness;
+package org.apache.beam.runners.dataflow.worker.windmill.client.throttling;
import org.apache.beam.sdk.annotations.Internal;
-/** Provides an interface to start streaming worker processing. */
+/**
+ * Tracks time spent in a throttled state due to {@code
Status.RESOURCE_EXHAUSTED} errors returned
+ * from gRPC calls.
+ */
@Internal
-public interface StreamingWorkerHarness {
- void start();
+@FunctionalInterface
+public interface ThrottledTimeTracker {
- void shutdown();
+ /** Returns the combined total of all throttle times and resets those times
to 0. */
+ long getAndResetThrottleTime();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
index fa36b11ffe5..f54091dc2b9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
@@ -42,7 +42,7 @@ public final class StreamPoolHeartbeatSender implements
HeartbeatSender {
this.heartbeatStreamPool.set(heartbeatStreamPool);
}
- public static StreamPoolHeartbeatSender Create(
+ public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool) {
return new StreamPoolHeartbeatSender(heartbeatStreamPool);
}
@@ -55,7 +55,7 @@ public final class StreamPoolHeartbeatSender implements
HeartbeatSender {
* enabled.
* @param getDataPool stream to use when using separate streams for
heartbeat is disabled.
*/
- public static StreamPoolHeartbeatSender Create(
+ public static StreamPoolHeartbeatSender create(
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
dedicatedHeartbeatPool,
@Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
@Nonnull StreamingGlobalConfigHandle configHandle) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index dadf0217123..6eeb7bd6bbf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.util.PropertyNames;
import org.apache.beam.runners.dataflow.util.Structs;
+import
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
@@ -104,6 +105,7 @@ import
org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
+import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
import
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -129,6 +131,9 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer.Type;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WatermarkHold;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
@@ -178,6 +183,7 @@ import
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.TextFormat;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedLong;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -285,6 +291,7 @@ public class StreamingDataflowWorkerTest {
private final FakeWindmillServer server =
new FakeWindmillServer(
errorCollector, computationId ->
computationStateCache.get(computationId));
+ private StreamingCounters streamingCounters;
public StreamingDataflowWorkerTest(Boolean streamingEngine) {
this.streamingEngine = streamingEngine;
@@ -304,9 +311,20 @@ public class StreamingDataflowWorkerTest {
return null;
}
+ private Iterable<CounterUpdate> buildCounters() {
+ return Iterables.concat(
+ streamingCounters
+ .pendingDeltaCounters()
+
.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE),
+ streamingCounters
+ .pendingCumulativeCounters()
+ .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
+ }
+
@Before
public void setUp() {
server.clearCommitsReceived();
+ streamingCounters = StreamingCounters.create();
}
@After
@@ -856,7 +874,13 @@ public class StreamingDataflowWorkerTest {
streamingDataflowWorkerTestParams.clock(),
streamingDataflowWorkerTestParams.executorSupplier(),
mockGlobalConfigHandle,
- streamingDataflowWorkerTestParams.localRetryTimeoutMs());
+ streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
+ streamingCounters,
+ new FakeWindmillStubFactoryFactory(
+ new FakeWindmillStubFactory(
+ () ->
+ WindmillChannelFactory.inProcessChannel(
+ "StreamingDataflowWorkerTestChannel"))));
this.computationStateCache = worker.getComputationStateCache();
return worker;
}
@@ -1715,7 +1739,7 @@ public class StreamingDataflowWorkerTest {
intervalWindowBytes(WINDOW_AT_ZERO)));
Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
- Iterable<CounterUpdate> counters = worker.buildCounters();
+ Iterable<CounterUpdate> counters = buildCounters();
// These tags and data are opaque strings and this is a change detector
test.
// The "/u" indicates the user's namespace, versus "/s" for system
namespace
@@ -1836,7 +1860,7 @@ public class StreamingDataflowWorkerTest {
expectedBytesRead += dataBuilder.build().getSerializedSize();
result = server.waitForAndGetCommits(1);
- counters = worker.buildCounters();
+ counters = buildCounters();
actualOutput = result.get(2L);
assertEquals(1, actualOutput.getOutputMessagesCount());
@@ -2004,7 +2028,7 @@ public class StreamingDataflowWorkerTest {
intervalWindowBytes(WINDOW_AT_ZERO)));
Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
- Iterable<CounterUpdate> counters = worker.buildCounters();
+ Iterable<CounterUpdate> counters = buildCounters();
// These tags and data are opaque strings and this is a change detector
test.
// The "/u" indicates the user's namespace, versus "/s" for system
namespace
@@ -2125,7 +2149,7 @@ public class StreamingDataflowWorkerTest {
expectedBytesRead += dataBuilder.build().getSerializedSize();
result = server.waitForAndGetCommits(1);
- counters = worker.buildCounters();
+ counters = buildCounters();
actualOutput = result.get(2L);
assertEquals(1, actualOutput.getOutputMessagesCount());
@@ -2430,7 +2454,7 @@ public class StreamingDataflowWorkerTest {
null));
Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
- Iterable<CounterUpdate> counters = worker.buildCounters();
+ Iterable<CounterUpdate> counters = buildCounters();
Windmill.WorkItemCommitRequest commit = result.get(1L);
UnsignedLong finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2492,7 +2516,7 @@ public class StreamingDataflowWorkerTest {
null));
result = server.waitForAndGetCommits(1);
- counters = worker.buildCounters();
+ counters = buildCounters();
commit = result.get(2L);
finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2540,7 +2564,7 @@ public class StreamingDataflowWorkerTest {
null));
result = server.waitForAndGetCommits(1);
- counters = worker.buildCounters();
+ counters = buildCounters();
commit = result.get(3L);
finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
@@ -2710,7 +2734,7 @@ public class StreamingDataflowWorkerTest {
server.whenGetWorkCalled().thenReturn(work);
Map<Long, Windmill.WorkItemCommitRequest> result =
server.waitForAndGetCommits(1);
- Iterable<CounterUpdate> counters = worker.buildCounters();
+ Iterable<CounterUpdate> counters = buildCounters();
Windmill.WorkItemCommitRequest commit = result.get(1L);
UnsignedLong finalizeId =
UnsignedLong.fromLongBits(commit.getSourceStateUpdates().getFinalizeIds(0));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
index 5a2df4baae6..4df3bf7cd82 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarnessTest.java
@@ -60,6 +60,8 @@ public class SingleSourceWorkerHarnessTest {
.setWaitForResources(waitForResources)
.setStreamingWorkScheduler(streamingWorkScheduler)
.setComputationStateFetcher(computationStateFetcher)
+ // no-op throttle time supplier.
+ .setThrottledTimeTracker(() -> 0L)
.setGetWorkSender(getWorkSender)
.build();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
index 7e65a495638..f348e4cf1bd 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporterTest.java
@@ -39,14 +39,15 @@ import org.mockito.Mockito;
@RunWith(JUnit4.class)
public class StreamingWorkerStatusReporterTest {
- private final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
- private final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
- private final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;
+ private static final long DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME = 1000;
+ private static final long DEFAULT_HARNESS_REPORTING_PERIOD = 10000;
+ private static final long DEFAULT_PER_WORKER_METRICS_PERIOD = 30000;
private BoundedQueueExecutor mockExecutor;
private WorkUnitClient mockWorkUnitClient;
private FailureTracker mockFailureTracker;
private MemoryMonitor mockMemoryMonitor;
+ private StreamingWorkerStatusReporter reporter;
@Before
public void setUp() {
@@ -54,23 +55,11 @@ public class StreamingWorkerStatusReporterTest {
this.mockWorkUnitClient = mock(WorkUnitClient.class);
this.mockFailureTracker = mock(FailureTracker.class);
this.mockMemoryMonitor = mock(MemoryMonitor.class);
+ this.reporter = buildWorkerStatusReporterForTest();
}
@Test
public void testOverrideMaximumThreadCount() throws Exception {
- StreamingWorkerStatusReporter reporter =
- StreamingWorkerStatusReporter.forTesting(
- true,
- mockWorkUnitClient,
- () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
- () -> Collections.emptyList(),
- mockFailureTracker,
- StreamingCounters.create(),
- mockMemoryMonitor,
- mockExecutor,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- DEFAULT_HARNESS_REPORTING_PERIOD,
- DEFAULT_PER_WORKER_METRICS_PERIOD);
StreamingScalingReportResponse streamingScalingReportResponse =
new StreamingScalingReportResponse().setMaximumThreadCount(10);
WorkerMessageResponse workerMessageResponse =
@@ -84,23 +73,25 @@ public class StreamingWorkerStatusReporterTest {
@Test
public void testHandleEmptyWorkerMessageResponse() throws Exception {
- StreamingWorkerStatusReporter reporter =
- StreamingWorkerStatusReporter.forTesting(
- true,
- mockWorkUnitClient,
- () -> DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME,
- () -> Collections.emptyList(),
- mockFailureTracker,
- StreamingCounters.create(),
- mockMemoryMonitor,
- mockExecutor,
- (threadName) -> Executors.newSingleThreadScheduledExecutor(),
- DEFAULT_HARNESS_REPORTING_PERIOD,
- DEFAULT_PER_WORKER_METRICS_PERIOD);
- WorkerMessageResponse workerMessageResponse = new WorkerMessageResponse();
when(mockWorkUnitClient.reportWorkerMessage(any()))
- .thenReturn(Collections.singletonList(workerMessageResponse));
+ .thenReturn(Collections.singletonList(new WorkerMessageResponse()));
reporter.reportPeriodicWorkerMessage();
verify(mockExecutor, Mockito.times(0)).setMaximumPoolSize(anyInt(),
anyInt());
}
+
+ private StreamingWorkerStatusReporter buildWorkerStatusReporterForTest() {
+ return StreamingWorkerStatusReporter.builder()
+ .setPublishCounters(true)
+ .setDataflowServiceClient(mockWorkUnitClient)
+ .setWindmillQuotaThrottleTime(() ->
DEFAULT_WINDMILL_QUOTA_THROTTLE_TIME)
+ .setAllStageInfo(Collections::emptyList)
+ .setFailureTracker(mockFailureTracker)
+ .setStreamingCounters(StreamingCounters.create())
+ .setMemoryMonitor(mockMemoryMonitor)
+ .setWorkExecutor(mockExecutor)
+ .setExecutorFactory((threadName) ->
Executors.newSingleThreadScheduledExecutor())
+
.setWindmillHarnessUpdateReportingPeriodMillis(DEFAULT_HARNESS_REPORTING_PERIOD)
+
.setPerWorkerMetricsUpdateReportingPeriodMillis(DEFAULT_PER_WORKER_METRICS_PERIOD)
+ .build();
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
index ed915088d0a..acbb3aebbcf 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
@@ -39,7 +39,7 @@ public class StreamPoolHeartbeatSenderTest {
public void sendsHeartbeatsOnStream() {
FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c
-> Optional.empty());
StreamPoolHeartbeatSender heartbeatSender =
- StreamPoolHeartbeatSender.Create(
+ StreamPoolHeartbeatSender.create(
WindmillStreamPool.create(1, Duration.standardSeconds(10),
server::getDataStream));
Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
heartbeatsBuilder
@@ -59,7 +59,7 @@ public class StreamPoolHeartbeatSenderTest {
FakeGlobalConfigHandle configHandle =
new
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
StreamPoolHeartbeatSender heartbeatSender =
- StreamPoolHeartbeatSender.Create(
+ StreamPoolHeartbeatSender.create(
WindmillStreamPool.create(
1, Duration.standardSeconds(10),
dedicatedServer::getDataStream),
WindmillStreamPool.create(
@@ -104,7 +104,7 @@ public class StreamPoolHeartbeatSenderTest {
FakeGlobalConfigHandle configHandle =
new
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
StreamPoolHeartbeatSender heartbeatSender =
- StreamPoolHeartbeatSender.Create(
+ StreamPoolHeartbeatSender.create(
WindmillStreamPool.create(
1, Duration.standardSeconds(10),
dedicatedServer::getDataStream),
WindmillStreamPool.create(