This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6ec1fb23ece move heartbeat processor to where it is being used (#31298)
6ec1fb23ece is described below
commit 6ec1fb23ece81721806bdc1323ffb23fa7ce55a0
Author: martin trieu <[email protected]>
AuthorDate: Fri Jun 28 02:31:11 2024 -0700
move heartbeat processor to where it is being used (#31298)
---
.../dataflow/worker/StreamingDataflowWorker.java | 172 +++++++++++++--------
.../windmill/client/grpc/GrpcWindmillServer.java | 53 +++----
.../client/grpc/GrpcWindmillStreamFactory.java | 101 +++++++-----
.../client/grpc/StreamingEngineClient.java | 22 +--
.../windmill/client/grpc/WindmillStreamSender.java | 19 +--
.../client/grpc/StreamingEngineClientTest.java | 5 +-
.../client/grpc/WindmillStreamSenderTest.java | 14 +-
.../budget/EvenGetWorkBudgetDistributorTest.java | 3 +-
8 files changed, 210 insertions(+), 179 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 98015e2ea71..fc1be2cd137 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -22,6 +22,7 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.MapTask;
+import com.google.auto.value.AutoValue;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -38,7 +39,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
-import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.core.metrics.MetricsLogger;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
@@ -103,7 +103,6 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.*;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
-import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
@@ -121,11 +120,6 @@ public class StreamingDataflowWorker {
MetricName.named(
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl",
"throttling-msecs");
- // Maximum number of threads for processing. Currently each thread
processes one key at a time.
- static final int MAX_PROCESSING_THREADS = 300;
- static final long THREAD_EXPIRATION_TIME_SEC = 60;
- static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
- static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
/**
* Sinks are marked 'full' in {@link StreamingModeExecutionContext} once the
amount of data sinked
@@ -135,13 +129,20 @@ public class StreamingDataflowWorker {
*/
public static final int MAX_SINK_BYTES = 10_000_000;
+ // Maximum number of threads for processing. Currently, each thread
processes one key at a time.
+ static final int MAX_PROCESSING_THREADS = 300;
+ static final long THREAD_EXPIRATION_TIME_SEC = 60;
+ static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+ static final Duration COMMIT_STREAM_TIMEOUT = Duration.standardMinutes(1);
private static final Logger LOG =
LoggerFactory.getLogger(StreamingDataflowWorker.class);
+
/** The idGenerator to generate unique id globally. */
private static final IdGenerator ID_GENERATOR =
IdGenerators.decrementingLongs();
private static final int DEFAULT_STATUS_PORT = 8081;
// Maximum size of the result of a GetWork request.
private static final long MAX_GET_WORK_FETCH_BYTES = 64L << 20; // 64m
+
/** Maximum number of failure stacktraces to report in each update sent to
backend. */
private static final int MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000;
@@ -328,39 +329,27 @@ public class StreamingDataflowWorker {
threadName ->
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat(threadName).build());
- GrpcWindmillStreamFactory windmillStreamFactory =
- createWindmillStreamFactory(options, clientId);
- GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
-
- // If ComputationConfig.Fetcher is the Streaming Appliance implementation,
WindmillServerStub
- // can be created without a heartbeat response processor, as appliance
does not send heartbeats.
- Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>>
configFetcherAndWindmillClient =
- createConfigFetcherAndWindmillClient(
- options,
- dataflowServiceClient,
- dispatcherClient,
- maxWorkItemCommitBytes,
- windmillStreamFactory);
+ GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder =
+ createGrpcwindmillStreamFactoryBuilder(options, clientId);
+
+ ConfigFetcherComputationStateCacheAndWindmillClient
+ configFetcherComputationStateCacheAndWindmillClient =
+ createConfigFetcherComputationStateCacheAndWindmillClient(
+ options,
+ dataflowServiceClient,
+ maxWorkItemCommitBytes,
+ windmillStreamFactoryBuilder,
+ configFetcher ->
+ ComputationStateCache.create(
+ configFetcher,
+ workExecutor,
+ windmillStateCache::forComputation,
+ ID_GENERATOR));
ComputationStateCache computationStateCache =
- ComputationStateCache.create(
- configFetcherAndWindmillClient.getLeft(),
- workExecutor,
- windmillStateCache::forComputation,
- ID_GENERATOR);
-
- // If WindmillServerStub is not present, it is a Streaming Engine job. We
now have all the
- // components created to initialize the GrpcWindmillServer.
+
configFetcherComputationStateCacheAndWindmillClient.computationStateCache();
WindmillServerStub windmillServer =
- configFetcherAndWindmillClient
- .getRight()
- .orElseGet(
- () ->
- GrpcWindmillServer.create(
- options,
- windmillStreamFactory,
- dispatcherClient,
- new
WorkHeartbeatResponseProcessor(computationStateCache::get)));
+ configFetcherComputationStateCacheAndWindmillClient.windmillServer();
FailureTracker failureTracker =
options.isEnableStreamingEngine()
@@ -393,7 +382,7 @@ public class StreamingDataflowWorker {
return new StreamingDataflowWorker(
windmillServer,
clientId,
- configFetcherAndWindmillClient.getLeft(),
+ configFetcherComputationStateCacheAndWindmillClient.configFetcher(),
computationStateCache,
windmillStateCache,
workExecutor,
@@ -407,20 +396,29 @@ public class StreamingDataflowWorker {
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
- windmillStreamFactory,
+
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
}
- private static Pair<ComputationConfig.Fetcher, Optional<WindmillServerStub>>
- createConfigFetcherAndWindmillClient(
+ /**
+ * {@link ComputationConfig.Fetcher}, {@link ComputationStateCache}, and
{@link
+ * WindmillServerStub} are constructed in different orders due to cyclic
dependencies depending on
+ * the underlying implementation. This method simplifies creating them and
returns an object with
+ * all of these dependencies initialized.
+ */
+ private static ConfigFetcherComputationStateCacheAndWindmillClient
+ createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
- GrpcDispatcherClient dispatcherClient,
AtomicInteger maxWorkItemCommitBytes,
- GrpcWindmillStreamFactory windmillStreamFactory) {
+ GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
+ Function<ComputationConfig.Fetcher, ComputationStateCache>
computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
- @Nullable WindmillServerStub windmillServer = null;
+ WindmillServerStub windmillServer;
+ ComputationStateCache computationStateCache;
+ GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(createStubFactory(options));
+ GrpcWindmillStreamFactory windmillStreamFactory;
if (options.isEnableStreamingEngine()) {
configFetcher =
StreamingEngineComputationConfigFetcher.create(
@@ -431,13 +429,36 @@ public class StreamingDataflowWorker {
config,
dispatcherClient::consumeWindmillDispatcherEndpoints,
maxWorkItemCommitBytes));
+ computationStateCache =
computationStateCacheFactory.apply(configFetcher);
+ windmillStreamFactory =
+ windmillStreamFactoryBuilder
+ .setProcessHeartbeatResponses(
+ new
WorkHeartbeatResponseProcessor(computationStateCache::get))
+ .setHealthCheckIntervalMillis(
+ options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+ .build();
+ windmillServer = GrpcWindmillServer.create(options,
windmillStreamFactory, dispatcherClient);
} else {
- windmillServer =
- createWindmillServerStub(options, windmillStreamFactory,
dispatcherClient, ignored -> {});
+ if (options.getWindmillServiceEndpoint() != null
+ || options.getLocalWindmillHostport().startsWith("grpc:")) {
+ windmillStreamFactory =
+ windmillStreamFactoryBuilder
+ .setHealthCheckIntervalMillis(
+
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+ .build();
+ windmillServer =
+ GrpcWindmillServer.create(options, windmillStreamFactory,
dispatcherClient);
+ } else {
+ windmillStreamFactory = windmillStreamFactoryBuilder.build();
+ windmillServer = new
JniWindmillApplianceServer(options.getLocalWindmillHostport());
+ }
+
configFetcher = new
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+ computationStateCache =
computationStateCacheFactory.apply(configFetcher);
}
- return Pair.of(configFetcher, Optional.ofNullable(windmillServer));
+ return ConfigFetcherComputationStateCacheAndWindmillClient.create(
+ configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
}
@VisibleForTesting
@@ -516,6 +537,11 @@ public class StreamingDataflowWorker {
options.getWindmillHarnessUpdateReportingPeriod().getMillis(),
options.getPerWorkerMetricsUpdateReportingPeriodMillis());
+ GrpcWindmillStreamFactory.Builder windmillStreamFactory =
+ createGrpcwindmillStreamFactoryBuilder(options, 1)
+ .setProcessHeartbeatResponses(
+ new
WorkHeartbeatResponseProcessor(computationStateCache::get));
+
return new StreamingDataflowWorker(
windmillServer,
1L,
@@ -533,7 +559,12 @@ public class StreamingDataflowWorker {
streamingCounters,
memoryMonitor,
maxWorkItemCommitBytes,
- createWindmillStreamFactory(options, 1),
+ options.isEnableStreamingEngine()
+ ? windmillStreamFactory
+ .setHealthCheckIntervalMillis(
+
options.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+ .build()
+ : windmillStreamFactory.build(),
executorSupplier,
stageInfo);
}
@@ -552,7 +583,7 @@ public class StreamingDataflowWorker {
}
}
- private static GrpcWindmillStreamFactory createWindmillStreamFactory(
+ private static GrpcWindmillStreamFactory.Builder
createGrpcwindmillStreamFactoryBuilder(
DataflowWorkerHarnessOptions options, long clientId) {
Duration maxBackoff =
!options.isEnableStreamingEngine() &&
options.getLocalWindmillHostport() != null
@@ -569,7 +600,10 @@ public class StreamingDataflowWorker {
.setMaxBackOffSupplier(() -> maxBackoff)
.setLogEveryNStreamFailures(options.getWindmillServiceStreamingLogEveryNStreamFailures())
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
- .build();
+ .setSendKeyedGetDataRequests(
+ !options.isEnableStreamingEngine()
+ || !DataflowRunner.hasExperiment(
+ options, "streaming_engine_send_new_heartbeat_requests"));
}
private static BoundedQueueExecutor
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
@@ -619,23 +653,6 @@ public class StreamingDataflowWorker {
worker.start();
}
- private static WindmillServerStub createWindmillServerStub(
- DataflowWorkerHarnessOptions options,
- GrpcWindmillStreamFactory windmillStreamFactory,
- GrpcDispatcherClient dispatcherClient,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
- if (options.getWindmillServiceEndpoint() != null
- || options.isEnableStreamingEngine()
- || options.getLocalWindmillHostport().startsWith("grpc:")) {
- windmillStreamFactory.scheduleHealthChecks(
- options.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
- return GrpcWindmillServer.create(
- options, windmillStreamFactory, dispatcherClient,
processHeartbeatResponses);
- } else {
- return new
JniWindmillApplianceServer(options.getLocalWindmillHostport());
- }
- }
-
private static ChannelCachingStubFactory createStubFactory(
DataflowWorkerHarnessOptions workerOptions) {
Function<WindmillServiceAddress, ManagedChannel> channelFactory =
@@ -895,4 +912,25 @@ public class StreamingDataflowWorker {
.pendingCumulativeCounters()
.extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE));
}
+
+ @AutoValue
+ abstract static class ConfigFetcherComputationStateCacheAndWindmillClient {
+
+ private static ConfigFetcherComputationStateCacheAndWindmillClient create(
+ ComputationConfig.Fetcher configFetcher,
+ ComputationStateCache computationStateCache,
+ WindmillServerStub windmillServer,
+ GrpcWindmillStreamFactory windmillStreamFactory) {
+ return new
AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient(
+ configFetcher, computationStateCache, windmillServer,
windmillStreamFactory);
+ }
+
+ abstract ComputationConfig.Fetcher configFetcher();
+
+ abstract ComputationStateCache computationStateCache();
+
+ abstract WindmillServerStub windmillServer();
+
+ abstract GrpcWindmillStreamFactory windmillStreamFactory();
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 69807c523ed..abf85d98548 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -28,7 +28,6 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner;
@@ -40,7 +39,6 @@ import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.CommitWorkResponse;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetConfigResponse;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetDataRequest;
@@ -96,29 +94,19 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
private final GrpcDispatcherClient dispatcherClient;
private final DataflowWorkerHarnessOptions options;
private final StreamingEngineThrottleTimers throttleTimers;
+ private final GrpcWindmillStreamFactory windmillStreamFactory;
private Duration maxBackoff;
private @Nullable WindmillApplianceGrpc.WindmillApplianceBlockingStub
syncApplianceStub;
- // If true, then active work refreshes will be sent as KeyedGetDataRequests.
Otherwise, use the
- // newer ComputationHeartbeatRequests.
- private final boolean sendKeyedGetDataRequests;
- private final Consumer<List<ComputationHeartbeatResponse>>
processHeartbeatResponses;
- private final GrpcWindmillStreamFactory windmillStreamFactory;
private GrpcWindmillServer(
DataflowWorkerHarnessOptions options,
GrpcWindmillStreamFactory grpcWindmillStreamFactory,
- GrpcDispatcherClient grpcDispatcherClient,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
+ GrpcDispatcherClient grpcDispatcherClient) {
this.options = options;
this.throttleTimers = StreamingEngineThrottleTimers.create();
this.maxBackoff =
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
this.dispatcherClient = grpcDispatcherClient;
this.syncApplianceStub = null;
- this.sendKeyedGetDataRequests =
- !options.isEnableStreamingEngine()
- || !DataflowRunner.hasExperiment(
- options, "streaming_engine_send_new_heartbeat_requests");
- this.processHeartbeatResponses = processHeartbeatResponses;
this.windmillStreamFactory = grpcWindmillStreamFactory;
}
@@ -148,11 +136,9 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
public static GrpcWindmillServer create(
DataflowWorkerHarnessOptions workerOptions,
GrpcWindmillStreamFactory grpcWindmillStreamFactory,
- GrpcDispatcherClient dispatcherClient,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
+ GrpcDispatcherClient dispatcherClient) {
GrpcWindmillServer grpcWindmillServer =
- new GrpcWindmillServer(
- workerOptions, grpcWindmillStreamFactory, dispatcherClient,
processHeartbeatResponses);
+ new GrpcWindmillServer(workerOptions, grpcWindmillStreamFactory,
dispatcherClient);
if (workerOptions.getWindmillServiceEndpoint() != null) {
grpcWindmillServer.configureWindmillServiceEndpoints();
} else if (!workerOptions.isEnableStreamingEngine()
@@ -188,11 +174,18 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
DataflowWorkerHarnessOptions testOptions =
testOptions(/* enableStreamingEngine= */ true, experiments);
+ boolean sendKeyedGetDataRequests =
+ !testOptions.isEnableStreamingEngine()
+ || !DataflowRunner.hasExperiment(
+ testOptions, "streaming_engine_send_new_heartbeat_requests");
GrpcWindmillStreamFactory windmillStreamFactory =
- GrpcWindmillStreamFactory.of(createJobHeader(testOptions,
clientId)).build();
- windmillStreamFactory.scheduleHealthChecks(
- testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs());
- return new GrpcWindmillServer(testOptions, windmillStreamFactory,
dispatcherClient, noop -> {});
+ GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
+ .setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
+ .setHealthCheckIntervalMillis(
+
testOptions.getWindmillServiceStreamingRpcHealthCheckPeriodMs())
+ .build();
+
+ return new GrpcWindmillServer(testOptions, windmillStreamFactory,
dispatcherClient);
}
@VisibleForTesting
@@ -205,8 +198,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
options,
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
// No-op, Appliance does not use Dispatcher to call Streaming
Engine.
- GrpcDispatcherClient.create(windmillStubFactory),
- noop -> {});
+ GrpcDispatcherClient.create(windmillStubFactory));
testServer.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
return testServer;
}
@@ -253,13 +245,13 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
}
@Override
- public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
-
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
+ public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
+ return dispatcherClient.getDispatcherEndpoints();
}
@Override
- public ImmutableSet<HostAndPort> getWindmillServiceEndpoints() {
- return dispatcherClient.getDispatcherEndpoints();
+ public void setWindmillServiceEndpoints(Set<HostAndPort> endpoints) {
+
dispatcherClient.consumeWindmillDispatcherEndpoints(ImmutableSet.copyOf(endpoints));
}
@Override
@@ -357,10 +349,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
@Override
public GetDataStream getDataStream() {
return windmillStreamFactory.createGetDataStream(
- dispatcherClient.getWindmillServiceStub(),
- throttleTimers.getDataThrottleTimer(),
- sendKeyedGetDataRequests,
- this.processHeartbeatResponses);
+ dispatcherClient.getWindmillServiceStub(),
throttleTimers.getDataThrottleTimer());
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
index c652e98e556..14866f3f586 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java
@@ -68,6 +68,7 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
private static final int DEFAULT_LOG_EVERY_N_STREAM_FAILURES = 1;
private static final int DEFAULT_STREAMING_RPC_BATCH_LIMIT =
Integer.MAX_VALUE;
private static final int DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS =
1;
+ private static final int NO_HEALTH_CHECKS = -1;
private final JobHeader jobHeader;
private final int logEveryNStreamFailures;
@@ -76,12 +77,18 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
private final Supplier<BackOff> grpcBackOff;
private final Set<AbstractWindmillStream<?, ?>> streamRegistry;
private final AtomicLong streamIdGenerator;
+ // If true, then active work refreshes will be sent as KeyedGetDataRequests.
Otherwise, use the
+ // newer ComputationHeartbeatRequests.
+ private final boolean sendKeyedGetDataRequests;
+ private final Consumer<List<ComputationHeartbeatResponse>>
processHeartbeatResponses;
- GrpcWindmillStreamFactory(
+ private GrpcWindmillStreamFactory(
JobHeader jobHeader,
int logEveryNStreamFailures,
int streamingRpcBatchLimit,
int windmillMessagesBetweenIsReadyChecks,
+ boolean sendKeyedGetDataRequests,
+ Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
Supplier<Duration> maxBackOffSupplier) {
this.jobHeader = jobHeader;
this.logEveryNStreamFailures = logEveryNStreamFailures;
@@ -96,9 +103,53 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
.withMaxBackoff(maxBackOffSupplier.get())
.backoff());
this.streamRegistry = ConcurrentHashMap.newKeySet();
+ this.sendKeyedGetDataRequests = sendKeyedGetDataRequests;
+ this.processHeartbeatResponses = processHeartbeatResponses;
this.streamIdGenerator = new AtomicLong();
}
+ /** @implNote Used for {@link AutoBuilder} {@link Builder} class, do not
call directly. */
+ static GrpcWindmillStreamFactory create(
+ JobHeader jobHeader,
+ int logEveryNStreamFailures,
+ int streamingRpcBatchLimit,
+ int windmillMessagesBetweenIsReadyChecks,
+ boolean sendKeyedGetDataRequests,
+ Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses,
+ Supplier<Duration> maxBackOffSupplier,
+ int healthCheckIntervalMillis) {
+ GrpcWindmillStreamFactory streamFactory =
+ new GrpcWindmillStreamFactory(
+ jobHeader,
+ logEveryNStreamFailures,
+ streamingRpcBatchLimit,
+ windmillMessagesBetweenIsReadyChecks,
+ sendKeyedGetDataRequests,
+ processHeartbeatResponses,
+ maxBackOffSupplier);
+
+ if (healthCheckIntervalMillis >= 0) {
+ // Health checks are run on background daemon thread, which will only be
cleaned up on JVM
+ // shutdown.
+ new Timer("WindmillHealthCheckTimer")
+ .schedule(
+ new TimerTask() {
+ @Override
+ public void run() {
+ Instant reportThreshold =
+
Instant.now().minus(Duration.millis(healthCheckIntervalMillis));
+ for (AbstractWindmillStream<?, ?> stream :
streamFactory.streamRegistry) {
+ stream.maybeSendHealthCheck(reportThreshold);
+ }
+ }
+ },
+ 0,
+ healthCheckIntervalMillis);
+ }
+
+ return streamFactory;
+ }
+
/**
* Returns a new {@link Builder} for {@link GrpcWindmillStreamFactory} with
default values set for
* the given {@link JobHeader}.
@@ -109,7 +160,10 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
.setWindmillMessagesBetweenIsReadyChecks(DEFAULT_WINDMILL_MESSAGES_BETWEEN_IS_READY_CHECKS)
.setMaxBackOffSupplier(() -> DEFAULT_MAX_BACKOFF)
.setLogEveryNStreamFailures(DEFAULT_LOG_EVERY_N_STREAM_FAILURES)
- .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT);
+ .setStreamingRpcBatchLimit(DEFAULT_STREAMING_RPC_BATCH_LIMIT)
+ .setHealthCheckIntervalMillis(NO_HEALTH_CHECKS)
+ .setSendKeyedGetDataRequests(true)
+ .setProcessHeartbeatResponses(ignored -> {});
}
private static <T extends AbstractStub<T>> T withDefaultDeadline(T stub) {
@@ -156,10 +210,7 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
}
public GetDataStream createGetDataStream(
- CloudWindmillServiceV1Alpha1Stub stub,
- ThrottleTimer getDataThrottleTimer,
- boolean sendKeyedGetDataRequests,
- Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses) {
+ CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer
getDataThrottleTimer) {
return GrpcGetDataStream.create(
responseObserver ->
withDefaultDeadline(stub).getDataStream(responseObserver),
grpcBackOff.get(),
@@ -174,11 +225,6 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
processHeartbeatResponses);
}
- public GetDataStream createGetDataStream(
- CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer
getDataThrottleTimer) {
- return createGetDataStream(stub, getDataThrottleTimer, false, (response)
-> {});
- }
-
public CommitWorkStream createCommitWorkStream(
CloudWindmillServiceV1Alpha1Stub stub, ThrottleTimer
commitWorkThrottleTimer) {
return GrpcCommitWorkStream.create(
@@ -214,30 +260,6 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
DEFAULT_STREAM_RPC_DEADLINE_SECONDS * 2,
windmillMessagesBetweenIsReadyChecks);
}
- /**
- * Schedules streaming RPC health checks to run on a background daemon
thread, which will be
- * cleaned up when the JVM shutdown.
- */
- public void scheduleHealthChecks(int healthCheckInterval) {
- if (healthCheckInterval < 0) {
- return;
- }
-
- new Timer("WindmillHealthCheckTimer")
- .schedule(
- new TimerTask() {
- @Override
- public void run() {
- Instant reportThreshold =
Instant.now().minus(Duration.millis(healthCheckInterval));
- for (AbstractWindmillStream<?, ?> stream : streamRegistry) {
- stream.maybeSendHealthCheck(reportThreshold);
- }
- }
- },
- 0,
- healthCheckInterval);
- }
-
@Override
public void appendSummaryHtml(PrintWriter writer) {
writer.write("Active Streams:<br>");
@@ -248,7 +270,7 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
}
@Internal
- @AutoBuilder(ofClass = GrpcWindmillStreamFactory.class)
+ @AutoBuilder(callMethod = "create")
public interface Builder {
Builder setJobHeader(JobHeader jobHeader);
@@ -260,6 +282,13 @@ public class GrpcWindmillStreamFactory implements
StatusDataProvider {
Builder setMaxBackOffSupplier(Supplier<Duration> maxBackOff);
+ Builder setSendKeyedGetDataRequests(boolean sendKeyedGetDataRequests);
+
+ Builder setProcessHeartbeatResponses(
+ Consumer<List<ComputationHeartbeatResponse>>
processHeartbeatResponses);
+
+ Builder setHealthCheckIntervalMillis(int healthCheckIntervalMillis);
+
GrpcWindmillStreamFactory build();
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
index a9ca749ff1c..4760062c575 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClient.java
@@ -30,13 +30,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.CheckReturnValue;
import javax.annotation.concurrent.ThreadSafe;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
@@ -93,7 +91,6 @@ public final class StreamingEngineClient {
private final Supplier<GetWorkerMetadataStream> getWorkerMetadataStream;
private final Queue<WindmillEndpoints> newWindmillEndpoints;
private final Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory;
- private final Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatResponseProcessor;
/** Writes are guarded by synchronization, reads are lock free. */
private final AtomicReference<StreamingEngineConnectionState> connections;
@@ -110,8 +107,7 @@ public final class StreamingEngineClient {
GetWorkBudgetDistributor getWorkBudgetDistributor,
GrpcDispatcherClient dispatcherClient,
long clientId,
- Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatResponseProcessor) {
+ Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory) {
this.jobHeader = jobHeader;
this.started = false;
this.streamFactory = streamFactory;
@@ -147,7 +143,6 @@ public final class StreamingEngineClient {
newWorkerMetadataPublisher.submit(
() -> newWindmillEndpoints.add(endpoints))));
this.workCommitterFactory = workCommitterFactory;
- this.heartbeatResponseProcessor = heartbeatResponseProcessor;
}
private static ExecutorService singleThreadedExecutorServiceOf(String
threadName) {
@@ -176,8 +171,7 @@ public final class StreamingEngineClient {
ChannelCachingStubFactory channelCachingStubFactory,
GetWorkBudgetDistributor getWorkBudgetDistributor,
GrpcDispatcherClient dispatcherClient,
- Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatProcessor) {
+ Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory) {
return new StreamingEngineClient(
jobHeader,
totalGetWorkBudget,
@@ -187,8 +181,7 @@ public final class StreamingEngineClient {
getWorkBudgetDistributor,
dispatcherClient,
/* clientId= */ new Random().nextLong(),
- workCommitterFactory,
- heartbeatProcessor);
+ workCommitterFactory);
}
@VisibleForTesting
@@ -201,8 +194,7 @@ public final class StreamingEngineClient {
GetWorkBudgetDistributor getWorkBudgetDistributor,
GrpcDispatcherClient dispatcherClient,
long clientId,
- Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatResponseProcessor) {
+ Function<WindmillStream.CommitWorkStream, WorkCommitter>
workCommitterFactory) {
StreamingEngineClient streamingEngineClient =
new StreamingEngineClient(
jobHeader,
@@ -213,8 +205,7 @@ public final class StreamingEngineClient {
getWorkBudgetDistributor,
dispatcherClient,
clientId,
- workCommitterFactory,
- heartbeatResponseProcessor);
+ workCommitterFactory);
streamingEngineClient.start();
return streamingEngineClient;
}
@@ -409,8 +400,7 @@ public final class StreamingEngineClient {
GetWorkBudget.noBudget(),
streamFactory,
workItemScheduler,
- workCommitterFactory,
- heartbeatResponseProcessor);
+ workCommitterFactory);
windmillStreamSender.startStreams();
return windmillStreamSender;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
index ff9ddc00c3f..e9f008eb522 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSender.java
@@ -17,15 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
@@ -73,24 +70,20 @@ public class WindmillStreamSender {
AtomicReference<GetWorkBudget> getWorkBudget,
GrpcWindmillStreamFactory streamingEngineStreamFactory,
WorkItemScheduler workItemScheduler,
- Function<CommitWorkStream, WorkCommitter> workCommitterFactory,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatResponseProcessor) {
+ Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
this.started = new AtomicBoolean(false);
this.getWorkBudget = getWorkBudget;
this.streamingEngineThrottleTimers =
StreamingEngineThrottleTimers.create();
// All streams are memoized/cached since they are expensive to create and
some implementations
// perform side effects on construction (i.e. sending initial requests to
the stream server to
- // initiate the streaming RPC connection). Stream instances
connect/reconnect internally so we
+ // initiate the streaming RPC connection). Stream instances
connect/reconnect internally, so we
// can reuse the same instance through the entire lifecycle of
WindmillStreamSender.
this.getDataStream =
Suppliers.memoize(
() ->
streamingEngineStreamFactory.createGetDataStream(
- stub,
- streamingEngineThrottleTimers.getDataThrottleTimer(),
- false,
- heartbeatResponseProcessor));
+ stub,
streamingEngineThrottleTimers.getDataThrottleTimer()));
this.commitWorkStream =
Suppliers.memoize(
() ->
@@ -116,16 +109,14 @@ public class WindmillStreamSender {
GetWorkBudget getWorkBudget,
GrpcWindmillStreamFactory streamingEngineStreamFactory,
WorkItemScheduler workItemScheduler,
- Function<CommitWorkStream, WorkCommitter> workCommitterFactory,
- Consumer<List<Windmill.ComputationHeartbeatResponse>>
heartbeatResponseProcessor) {
+ Function<CommitWorkStream, WorkCommitter> workCommitterFactory) {
return new WindmillStreamSender(
stub,
getWorkRequest,
new AtomicReference<>(getWorkBudget),
streamingEngineStreamFactory,
workItemScheduler,
- workCommitterFactory,
- heartbeatResponseProcessor);
+ workCommitterFactory);
}
private static GetWorkRequest withRequestBudget(GetWorkRequest request,
GetWorkBudget budget) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
index 9822daa9156..bc3afaff1b3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/StreamingEngineClientTest.java
@@ -181,8 +181,7 @@ public class StreamingEngineClientTest {
getWorkBudgetDistributor,
dispatcherClient,
CLIENT_ID,
- ignored -> mock(WorkCommitter.class),
- ignored -> {});
+ ignored -> mock(WorkCommitter.class));
}
@Test
@@ -238,7 +237,7 @@ public class StreamingEngineClientTest {
.createDirectGetWorkStream(
any(), eq(getWorkRequest(0, 0)), any(), any(), any(),
eq(noOpProcessWorkItemFn()));
- verify(streamFactory, times(2)).createGetDataStream(any(), any(),
eq(false), any());
+ verify(streamFactory, times(2)).createGetDataStream(any(), any());
verify(streamFactory, times(2)).createCommitWorkStream(any(), any());
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
index 496f69dc52d..162c69509ae 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/WindmillStreamSenderTest.java
@@ -107,7 +107,7 @@ public class WindmillStreamSenderTest {
any(),
eq(workItemScheduler));
- verify(streamFactory).createGetDataStream(eq(stub),
any(ThrottleTimer.class), eq(false), any());
+ verify(streamFactory).createGetDataStream(eq(stub),
any(ThrottleTimer.class));
verify(streamFactory).createCommitWorkStream(eq(stub),
any(ThrottleTimer.class));
}
@@ -138,8 +138,7 @@ public class WindmillStreamSenderTest {
any(),
eq(workItemScheduler));
- verify(streamFactory, times(1))
- .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false),
any());
+ verify(streamFactory, times(1)).createGetDataStream(eq(stub),
any(ThrottleTimer.class));
verify(streamFactory, times(1)).createCommitWorkStream(eq(stub),
any(ThrottleTimer.class));
}
@@ -173,8 +172,7 @@ public class WindmillStreamSenderTest {
any(),
eq(workItemScheduler));
- verify(streamFactory, times(1))
- .createGetDataStream(eq(stub), any(ThrottleTimer.class), eq(false),
any());
+ verify(streamFactory, times(1)).createGetDataStream(eq(stub),
any(ThrottleTimer.class));
verify(streamFactory, times(1)).createCommitWorkStream(eq(stub),
any(ThrottleTimer.class));
}
@@ -208,8 +206,7 @@ public class WindmillStreamSenderTest {
eq(workItemScheduler)))
.thenReturn(mockGetWorkStream);
- when(mockStreamFactory.createGetDataStream(
- eq(stub), any(ThrottleTimer.class), eq(false), any()))
+ when(mockStreamFactory.createGetDataStream(eq(stub),
any(ThrottleTimer.class)))
.thenReturn(mockGetDataStream);
when(mockStreamFactory.createCommitWorkStream(eq(stub),
any(ThrottleTimer.class)))
.thenReturn(mockCommitWorkStream);
@@ -239,7 +236,6 @@ public class WindmillStreamSenderTest {
budget,
streamFactory,
workItemScheduler,
- ignored -> mock(WorkCommitter.class),
- ignored -> {});
+ ignored -> mock(WorkCommitter.class));
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
index 4fa424412ee..83ae8aa22ce 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/EvenGetWorkBudgetDistributorTest.java
@@ -259,7 +259,6 @@ public class EvenGetWorkBudgetDistributorTest {
.build())
.build(),
(workItem, watermarks, processingContext, ackWorkItemQueued,
getWorkStreamLatencies) -> {},
- ignored -> mock(WorkCommitter.class),
- ignored -> {});
+ ignored -> mock(WorkCommitter.class));
}
}