This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 019efe131f9 [Dataflow Streaming] Add support to read user worker
settings from backend (#32408)
019efe131f9 is described below
commit 019efe131f93ad7b56007fca479da003cfe6e2a9
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Sep 18 04:14:56 2024 -0700
[Dataflow Streaming] Add support to read user worker settings from backend
(#32408)
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 2 +-
.../runners/dataflow/worker/OperationalLimits.java | 39 +--
.../dataflow/worker/StreamingDataflowWorker.java | 75 ++----
.../worker/StreamingModeExecutionContext.java | 12 +-
.../worker/streaming/ComputationWorkExecutor.java | 5 +-
.../worker/streaming/config/ComputationConfig.java | 3 +-
.../streaming/config/FixedGlobalConfigHandle.java | 48 ++++
...StreamingApplianceComputationConfigFetcher.java | 10 +-
.../StreamingEngineComputationConfigFetcher.java | 69 +++--
...elineConfig.java => StreamingGlobalConfig.java} | 41 ++-
.../config/StreamingGlobalConfigHandle.java | 37 +++
.../config/StreamingGlobalConfigHandleImpl.java | 113 ++++++++
.../harness/StreamingWorkerStatusPages.java | 22 +-
.../windmill/client/grpc/GrpcDispatcherClient.java | 10 +
.../processing/ComputationWorkExecutorFactory.java | 7 +-
.../work/processing/StreamingWorkScheduler.java | 25 +-
.../worker/StreamingDataflowWorkerTest.java | 45 +++-
.../worker/StreamingModeExecutionContextTest.java | 8 +-
.../dataflow/worker/WorkerCustomSourcesTest.java | 11 +-
.../config/FixedGlobalConfigHandleTest.java | 84 ++++++
...amingApplianceComputationConfigFetcherTest.java | 4 +-
...treamingEngineComputationConfigFetcherTest.java | 102 ++++---
.../StreamingGlobalConfigHandleImplTest.java | 293 +++++++++++++++++++++
.../worker/windmill/src/main/proto/windmill.proto | 9 +
24 files changed, 867 insertions(+), 207 deletions(-)
diff --git
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 0110f71974e..4aec3a796f2 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -738,7 +738,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_api_common :
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
google_api_services_bigquery :
"com.google.apis:google-api-services-bigquery:v2-rev20240815-2.0.0", //
[bomupgrader] sets version
google_api_services_cloudresourcemanager :
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0",
// [bomupgrader] sets version
- google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20240624-$google_clients_version",
+ google_api_services_dataflow :
"com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
google_api_services_healthcare :
"com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
google_api_services_pubsub :
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
google_api_services_storage :
"com.google.apis:google-api-services-storage:v1-rev20240706-2.0.0", //
[bomupgrader] sets version
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
index 47e36e49850..84f41c473fe 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
@@ -17,37 +17,38 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import com.google.auto.value.AutoBuilder;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Internal;
/** Keep track of any operational limits required by the backend. */
-public class OperationalLimits {
+@AutoValue
+@Internal
+public abstract class OperationalLimits {
+
+ private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
+
// Maximum size of a commit from a single work item.
- public final long maxWorkItemCommitBytes;
+ public abstract long getMaxWorkItemCommitBytes();
// Maximum size of a single output element's serialized key.
- public final long maxOutputKeyBytes;
+ public abstract long getMaxOutputKeyBytes();
// Maximum size of a single output element's serialized value.
- public final long maxOutputValueBytes;
+ public abstract long getMaxOutputValueBytes();
- OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long
maxOutputValueBytes) {
- this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
- this.maxOutputKeyBytes = maxOutputKeyBytes;
- this.maxOutputValueBytes = maxOutputValueBytes;
- }
+ @AutoValue.Builder
+ public abstract static class Builder {
- @AutoBuilder(ofClass = OperationalLimits.class)
- public interface Builder {
- Builder setMaxWorkItemCommitBytes(long bytes);
+ public abstract Builder setMaxWorkItemCommitBytes(long bytes);
- Builder setMaxOutputKeyBytes(long bytes);
+ public abstract Builder setMaxOutputKeyBytes(long bytes);
- Builder setMaxOutputValueBytes(long bytes);
+ public abstract Builder setMaxOutputValueBytes(long bytes);
- OperationalLimits build();
+ public abstract OperationalLimits build();
}
- public static Builder builder() {
- return new AutoBuilder_OperationalLimits_Builder()
- .setMaxWorkItemCommitBytes(Long.MAX_VALUE)
+ public static OperationalLimits.Builder builder() {
+ return new AutoValue_OperationalLimits.Builder()
+ .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
.setMaxOutputKeyBytes(Long.MAX_VALUE)
.setMaxOutputValueBytes(Long.MAX_VALUE);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 1af67738209..0dedd4f34fd 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -34,8 +34,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.runners.core.metrics.MetricsLogger;
@@ -49,9 +47,11 @@ import
org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
import
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
-import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
@@ -103,9 +103,7 @@ import
org.apache.beam.sdk.util.construction.CoderTranslation;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -181,7 +179,6 @@ public final class StreamingDataflowWorker {
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
- AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory windmillStreamFactory,
Function<String, ScheduledExecutorService> executorSupplier,
ConcurrentMap<String, StageInfo> stageInfoMap) {
@@ -237,8 +234,8 @@ public final class StreamingDataflowWorker {
streamingCounters,
hotKeyLogger,
sampler,
- operationalLimits,
ID_GENERATOR,
+ configFetcher.getGlobalConfigHandle(),
stageInfoMap);
ThrottlingGetDataMetricTracker getDataMetricTracker =
@@ -298,6 +295,7 @@ public final class StreamingDataflowWorker {
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
.setGetDataStatusProvider(getDataClient::printHtml)
.setWorkUnitExecutor(workUnitExecutor)
+ .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
.build();
Windmill.GetWorkRequest request =
@@ -335,8 +333,6 @@ public final class StreamingDataflowWorker {
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options,
LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
- AtomicReference<OperationalLimits> operationalLimits =
- new AtomicReference<>(OperationalLimits.builder().build());
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
@@ -354,7 +350,6 @@ public final class StreamingDataflowWorker {
createConfigFetcherComputationStateCacheAndWindmillClient(
options,
dataflowServiceClient,
- operationalLimits,
windmillStreamFactoryBuilder,
configFetcher ->
ComputationStateCache.create(
@@ -412,7 +407,6 @@ public final class StreamingDataflowWorker {
workFailureProcessor,
streamingCounters,
memoryMonitor,
- operationalLimits,
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
executorSupplier,
stageInfo);
@@ -428,7 +422,6 @@ public final class StreamingDataflowWorker {
createConfigFetcherComputationStateCacheAndWindmillClient(
DataflowWorkerHarnessOptions options,
WorkUnitClient dataflowServiceClient,
- AtomicReference<OperationalLimits> operationalLimits,
GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
Function<ComputationConfig.Fetcher, ComputationStateCache>
computationStateCacheFactory) {
ComputationConfig.Fetcher configFetcher;
@@ -440,13 +433,8 @@ public final class StreamingDataflowWorker {
GrpcDispatcherClient.create(createStubFactory(options));
configFetcher =
StreamingEngineComputationConfigFetcher.create(
- options.getGlobalConfigRefreshPeriod().getMillis(),
- dataflowServiceClient,
- config ->
- onPipelineConfig(
- config,
- dispatcherClient::consumeWindmillDispatcherEndpoints,
- operationalLimits::set));
+ options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient);
+
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
computationStateCache =
computationStateCacheFactory.apply(configFetcher);
windmillStreamFactory =
windmillStreamFactoryBuilder
@@ -474,7 +462,10 @@ public final class StreamingDataflowWorker {
windmillServer = new
JniWindmillApplianceServer(options.getLocalWindmillHostport());
}
- configFetcher = new
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+ configFetcher =
+ new StreamingApplianceComputationConfigFetcher(
+ windmillServer::getConfig,
+ new
FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
computationStateCache =
computationStateCacheFactory.apply(configFetcher);
}
@@ -494,10 +485,9 @@ public final class StreamingDataflowWorker {
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
Function<String, ScheduledExecutorService> executorSupplier,
- int localRetryTimeoutMs,
- OperationalLimits limits) {
+ StreamingGlobalConfigHandleImpl globalConfigHandle,
+ int localRetryTimeoutMs) {
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
- AtomicReference<OperationalLimits> operationalLimits = new
AtomicReference<>(limits);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache stateCache =
WindmillStateCache.builder()
@@ -510,13 +500,20 @@ public final class StreamingDataflowWorker {
/* hasReceivedGlobalConfig= */ true,
options.getGlobalConfigRefreshPeriod().getMillis(),
workUnitClient,
- executorSupplier,
- config ->
- onPipelineConfig(
- config,
- windmillServer::setWindmillServiceEndpoints,
- operationalLimits::set))
- : new
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+ globalConfigHandle,
+ executorSupplier)
+ : new StreamingApplianceComputationConfigFetcher(
+ windmillServer::getConfig, globalConfigHandle);
+ configFetcher
+ .getGlobalConfigHandle()
+ .registerConfigObserver(
+ config -> {
+ if (config.windmillServiceEndpoints().isEmpty()) {
+ LOG.warn("Received empty windmill service endpoints");
+ return;
+ }
+
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
+ });
ConcurrentMap<String, String> stateNameMap =
new ConcurrentHashMap<>(prePopulatedStateNameMappings);
ComputationStateCache computationStateCache =
@@ -583,7 +580,6 @@ public final class StreamingDataflowWorker {
workFailureProcessor,
streamingCounters,
memoryMonitor,
- operationalLimits,
options.isEnableStreamingEngine()
? windmillStreamFactory
.setHealthCheckIntervalMillis(
@@ -594,23 +590,6 @@ public final class StreamingDataflowWorker {
stageInfo);
}
- private static void onPipelineConfig(
- StreamingEnginePipelineConfig config,
- Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
- Consumer<OperationalLimits> operationalLimits) {
-
- operationalLimits.accept(
- OperationalLimits.builder()
- .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
- .setMaxOutputKeyBytes(config.maxOutputKeyBytes())
- .setMaxOutputValueBytes(config.maxOutputValueBytes())
- .build());
-
- if (!config.windmillServiceEndpoints().isEmpty()) {
-
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
- }
- }
-
private static GrpcWindmillStreamFactory.Builder
createGrpcwindmillStreamFactoryBuilder(
DataflowWorkerHarnessOptions options, long clientId) {
Duration maxBackoff =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index f25f6294da8..5ff94884e97 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -50,6 +50,7 @@ import
org.apache.beam.runners.dataflow.worker.counters.NameContext;
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
@@ -107,6 +108,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
private final ImmutableMap<String, String> stateNameMap;
private final WindmillStateCache.ForComputation stateCache;
private final ReaderCache readerCache;
+ private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
private volatile long backlogBytes;
@@ -153,6 +155,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
+ StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
boolean throwExceptionOnLargeOutput) {
super(
@@ -163,6 +166,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
sinkByteLimit);
this.computationId = computationId;
this.readerCache = readerCache;
+ this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
@@ -176,11 +180,11 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
}
public long getMaxOutputKeyBytes() {
- return operationalLimits.maxOutputKeyBytes;
+ return operationalLimits.getMaxOutputKeyBytes();
}
public long getMaxOutputValueBytes() {
- return operationalLimits.maxOutputValueBytes;
+ return operationalLimits.getMaxOutputValueBytes();
}
public boolean throwExceptionsForLargeOutput() {
@@ -196,13 +200,13 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
- OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
this.key = key;
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId,
work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
- this.operationalLimits = operationalLimits;
+ // Snapshot the limits for entire bundle processing.
+ this.operationalLimits =
globalConfigHandle.getConfig().operationalLimits();
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
index 8a00194887d..8dc681fc640 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
@@ -24,7 +24,6 @@ import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
-import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
@@ -73,11 +72,9 @@ public abstract class ComputationWorkExecutor {
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
- OperationalLimits operationalLimits,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
- context()
- .start(key, work, stateReader, sideInputStateFetcher,
operationalLimits, outputBuilder);
+ context().start(key, work, stateReader, sideInputStateFetcher,
outputBuilder);
workExecutor().execute();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
index fb8bcf7edbf..9702751aeb9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
@@ -48,12 +48,13 @@ public abstract class ComputationConfig {
public abstract ImmutableMap<String, String> stateNameMap();
/** Interface to fetch configurations for a specific computation. */
- @FunctionalInterface
public interface Fetcher {
default void start() {}
default void stop() {}
Optional<ComputationConfig> fetchConfig(String computationId);
+
+ StreamingGlobalConfigHandle getGlobalConfigHandle();
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
new file mode 100644
index 00000000000..c244ecb8c7a
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+/*
+ * StreamingGlobalConfigHandle returning a fixed config
+ * initialized during construction. Used for Appliance and Tests.
+ */
+public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle {
+
+ private final StreamingGlobalConfig config;
+
+ public FixedGlobalConfigHandle(StreamingGlobalConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public StreamingGlobalConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig>
callback) {
+ callback.accept(config);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
index 786ded09498..025e66be79c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
@@ -48,11 +48,14 @@ public final class
StreamingApplianceComputationConfigFetcher implements Computa
private final ApplianceComputationConfigFetcher
applianceComputationConfigFetcher;
private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+ private final StreamingGlobalConfigHandle globalConfigHandle;
public StreamingApplianceComputationConfigFetcher(
- ApplianceComputationConfigFetcher applianceComputationConfigFetcher) {
+ ApplianceComputationConfigFetcher applianceComputationConfigFetcher,
+ StreamingGlobalConfigHandle globalConfigHandle) {
this.applianceComputationConfigFetcher = applianceComputationConfigFetcher;
this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+ this.globalConfigHandle = globalConfigHandle;
}
/** Returns a {@code Table<ComputationId, TransformUserName,
StateFamilyName>} */
@@ -112,6 +115,11 @@ public final class
StreamingApplianceComputationConfigFetcher implements Computa
.collect(toImmutableMap(NameMapEntry::getUserName,
NameMapEntry::getSystemName)));
}
+ @Override
+ public StreamingGlobalConfigHandle getGlobalConfigHandle() {
+ return globalConfigHandle;
+ }
+
private Optional<ComputationConfig> createComputationConfig(
String serializedMapTask,
Table<String, String, String>
transformUserNameToStateFamilyByComputationId,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index d230aac54c6..22b0dac6eb2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -30,16 +30,18 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.StreamSupport;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
+import
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
@@ -72,33 +74,31 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
private final long globalConfigRefreshPeriodMillis;
private final WorkUnitClient dataflowServiceClient;
private final ScheduledExecutorService globalConfigRefresher;
- private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+ private final StreamingGlobalConfigHandleImpl globalConfigHandle;
private final AtomicBoolean hasReceivedGlobalConfig;
private StreamingEngineComputationConfigFetcher(
boolean hasReceivedGlobalConfig,
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient,
- ScheduledExecutorService globalConfigRefresher,
- Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ StreamingGlobalConfigHandleImpl globalConfigHandle,
+ ScheduledExecutorService globalConfigRefresher) {
this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
this.dataflowServiceClient = dataflowServiceClient;
this.globalConfigRefresher = globalConfigRefresher;
- this.onStreamingConfig = onStreamingConfig;
+ this.globalConfigHandle = globalConfigHandle;
this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
}
public static StreamingEngineComputationConfigFetcher create(
- long globalConfigRefreshPeriodMillis,
- WorkUnitClient dataflowServiceClient,
- Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ long globalConfigRefreshPeriodMillis, WorkUnitClient
dataflowServiceClient) {
return new StreamingEngineComputationConfigFetcher(
/* hasReceivedGlobalConfig= */ false,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
+ new StreamingGlobalConfigHandleImpl(),
Executors.newSingleThreadScheduledExecutor(
- new
ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()),
- onStreamingConfig);
+ new
ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()));
}
@VisibleForTesting
@@ -106,14 +106,14 @@ public final class
StreamingEngineComputationConfigFetcher implements Computatio
boolean hasReceivedGlobalConfig,
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient,
- Function<String, ScheduledExecutorService> executorSupplier,
- Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+ StreamingGlobalConfigHandleImpl globalConfigHandle,
+ Function<String, ScheduledExecutorService> executorSupplier) {
return new StreamingEngineComputationConfigFetcher(
hasReceivedGlobalConfig,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
- executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME),
- onStreamingConfig);
+ globalConfigHandle,
+ executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME));
}
@VisibleForTesting
@@ -157,11 +157,9 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
}
}
- private StreamingEnginePipelineConfig
createPipelineConfig(StreamingConfigTask config) {
- StreamingEnginePipelineConfig.Builder pipelineConfig =
StreamingEnginePipelineConfig.builder();
- if (config.getUserStepToStateFamilyNameMap() != null) {
-
pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap());
- }
+ private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask
config) {
+ StreamingGlobalConfig.Builder pipelineConfig =
StreamingGlobalConfig.builder();
+ OperationalLimits.Builder operationalLimits = OperationalLimits.builder();
if (config.getWindmillServiceEndpoint() != null
&& !config.getWindmillServiceEndpoint().isEmpty()) {
@@ -184,23 +182,36 @@ public final class
StreamingEngineComputationConfigFetcher implements Computatio
if (config.getMaxWorkItemCommitBytes() != null
&& config.getMaxWorkItemCommitBytes() > 0
&& config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) {
-
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
+
operationalLimits.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
}
if (config.getOperationalLimits() != null) {
if (config.getOperationalLimits().getMaxKeyBytes() != null
&& config.getOperationalLimits().getMaxKeyBytes() > 0
&& config.getOperationalLimits().getMaxKeyBytes() <=
Integer.MAX_VALUE) {
-
pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
+
operationalLimits.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
}
if (config.getOperationalLimits().getMaxProductionOutputBytes() != null
&& config.getOperationalLimits().getMaxProductionOutputBytes() > 0
&& config.getOperationalLimits().getMaxProductionOutputBytes() <=
Integer.MAX_VALUE) {
- pipelineConfig.setMaxOutputValueBytes(
+ operationalLimits.setMaxOutputValueBytes(
config.getOperationalLimits().getMaxProductionOutputBytes());
}
}
+ pipelineConfig.setOperationalLimits(operationalLimits.build());
+
+ byte[] settings_bytes = config.decodeUserWorkerRunnerV1Settings();
+ if (settings_bytes != null) {
+ UserWorkerRunnerV1Settings settings =
UserWorkerRunnerV1Settings.newBuilder().build();
+ try {
+ settings = UserWorkerRunnerV1Settings.parseFrom(settings_bytes);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Parsing UserWorkerRunnerV1Settings failed", e);
+ }
+ pipelineConfig.setUserWorkerJobSettings(settings);
+ }
+
return pipelineConfig.build();
}
@@ -233,6 +244,11 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
.flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig);
}
+ @Override
+ public StreamingGlobalConfigHandle getGlobalConfigHandle() {
+ return globalConfigHandle;
+ }
+
@Override
public void stop() {
// We have already shutdown or start has not been called.
@@ -259,7 +275,7 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
@SuppressWarnings("FutureReturnValueIgnored")
private void schedulePeriodicGlobalConfigRequests() {
globalConfigRefresher.scheduleWithFixedDelay(
- () -> fetchGlobalConfig().ifPresent(onStreamingConfig),
+ () -> fetchGlobalConfig().ifPresent(globalConfigHandle::setConfig),
0,
globalConfigRefreshPeriodMillis,
TimeUnit.MILLISECONDS);
@@ -272,9 +288,9 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
private synchronized void fetchInitialPipelineGlobalConfig() {
while (!hasReceivedGlobalConfig.get()) {
LOG.info("Sending request to get initial global configuration for this
worker.");
- Optional<StreamingEnginePipelineConfig> globalConfig =
fetchGlobalConfig();
+ Optional<StreamingGlobalConfig> globalConfig = fetchGlobalConfig();
if (globalConfig.isPresent()) {
- onStreamingConfig.accept(globalConfig.get());
+ globalConfigHandle.setConfig(globalConfig.get());
hasReceivedGlobalConfig.set(true);
break;
}
@@ -285,13 +301,14 @@ public final class
StreamingEngineComputationConfigFetcher implements Computatio
LOG.info("Initial global configuration received, harness is now ready");
}
- private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
+ private Optional<StreamingGlobalConfig> fetchGlobalConfig() {
return
fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
.map(config -> createPipelineConfig(config));
}
@FunctionalInterface
private interface ThrowingFetchWorkItemFn {
+
Optional<WorkItem> fetchWorkItem() throws IOException;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
similarity index 56%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
index 8f1ff93f6a4..8f76f5ec27a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker.streaming.config;
import com.google.auto.value.AutoValue;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
import org.apache.beam.sdk.annotations.Internal;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
@@ -27,41 +27,30 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPor
/** Global pipeline config for pipelines running in Streaming Engine mode. */
@AutoValue
@Internal
-public abstract class StreamingEnginePipelineConfig {
+public abstract class StreamingGlobalConfig {
- private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
-
- public static StreamingEnginePipelineConfig.Builder builder() {
- return new AutoValue_StreamingEnginePipelineConfig.Builder()
- .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
- .setMaxOutputKeyBytes(Long.MAX_VALUE)
- .setMaxOutputValueBytes(Long.MAX_VALUE)
- .setUserStepToStateFamilyNameMap(new HashMap<>())
- .setWindmillServiceEndpoints(ImmutableSet.of());
+ public static StreamingGlobalConfig.Builder builder() {
+ return new AutoValue_StreamingGlobalConfig.Builder()
+ .setWindmillServiceEndpoints(ImmutableSet.of())
+
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
+ .setOperationalLimits(OperationalLimits.builder().build());
}
- public abstract long maxWorkItemCommitBytes();
-
- public abstract long maxOutputKeyBytes();
-
- public abstract long maxOutputValueBytes();
-
- public abstract Map<String, String> userStepToStateFamilyNameMap();
+ public abstract OperationalLimits operationalLimits();
public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
+ public abstract UserWorkerRunnerV1Settings userWorkerJobSettings();
+
@AutoValue.Builder
public abstract static class Builder {
- public abstract Builder setMaxWorkItemCommitBytes(long value);
-
- public abstract Builder setMaxOutputKeyBytes(long value);
- public abstract Builder setMaxOutputValueBytes(long value);
+ public abstract Builder
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
- public abstract Builder setUserStepToStateFamilyNameMap(Map<String,
String> value);
+ public abstract Builder setOperationalLimits(OperationalLimits
operationalLimits);
- public abstract Builder
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
+ public abstract Builder
setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);
- public abstract StreamingEnginePipelineConfig build();
+ public abstract StreamingGlobalConfig build();
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
new file mode 100644
index 00000000000..6f75ba88747
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+public interface StreamingGlobalConfigHandle {
+
+ /** Returns the latest StreamingGlobalConfig */
+ StreamingGlobalConfig getConfig();
+
+ /**
+ * Subscribe to config updates by registering a callback. Callback should be
called the first time
+ * with settings, if any. The callback could execute inline before the
method returns.
+ */
+ void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig>
callback);
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
new file mode 100644
index 00000000000..9ed5c9fcf39
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public class StreamingGlobalConfigHandleImpl implements
StreamingGlobalConfigHandle {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamingGlobalConfigHandleImpl.class);
+
+ private final AtomicReference<StreamingGlobalConfig> streamingEngineConfig =
+ new AtomicReference<>();
+
+ private final CopyOnWriteArrayList<ConfigCallback> configCallbacks = new
CopyOnWriteArrayList<>();
+
+ @Override
+ @Nonnull
+ public StreamingGlobalConfig getConfig() {
+ Preconditions.checkState(
+ streamingEngineConfig.get() != null,
+ "Global config should be set before any processing is done");
+ return streamingEngineConfig.get();
+ }
+
+ @Override
+ public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig>
callback) {
+ ConfigCallback configCallback = new ConfigCallback(callback);
+ configCallbacks.add(configCallback);
+ if (streamingEngineConfig.get() != null) {
+ configCallback.run();
+ }
+ }
+
+ void setConfig(@Nonnull StreamingGlobalConfig config) {
+ if (config.equals(streamingEngineConfig.get())) {
+ return;
+ }
+ streamingEngineConfig.set(config);
+ for (ConfigCallback configCallback : configCallbacks) {
+ configCallback.run();
+ }
+ }
+
+ private class ConfigCallback {
+
+ private final AtomicInteger queuedOrRunning = new AtomicInteger(0);
+ private final Consumer<StreamingGlobalConfig> configConsumer;
+
+ private ConfigCallback(Consumer<StreamingGlobalConfig> configConsumer) {
+ this.configConsumer = configConsumer;
+ }
+
+ /**
+ * Runs the passed in callback with the latest config. Overlapping `run()`
calls will be
+ * collapsed into one. If the callback is already running a new call will
be scheduled to run
+ * after the current execution completes, on the same thread which ran the
previous run.
+ */
+ private void run() {
+ // If the callback is already running,
+ // Increment queued and return. The thread running
+ // the callback will run it again with the latest config.
+ if (queuedOrRunning.incrementAndGet() > 1) {
+ return;
+ }
+ // Else run the callback
+ while (true) {
+ try {
+
configConsumer.accept(StreamingGlobalConfigHandleImpl.this.streamingEngineConfig.get());
+ } catch (Exception e) {
+ LOG.error("Exception running GlobalConfig callback", e);
+ }
+ if (queuedOrRunning.updateAndGet(
+ queuedOrRunning -> {
+ if (queuedOrRunning == 1) {
+ // If there are no queued requests stop processing.
+ return 0;
+ }
+ // Else, clear queue, set 1 running and run the callback
+ return 1;
+ })
+ == 0) {
+ break;
+ }
+ }
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
index d305e25af7e..6981312eff1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.servlet.http.HttpServletRequest;
@@ -38,6 +39,8 @@ import
org.apache.beam.runners.dataflow.worker.status.DebugCapture;
import
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
@@ -77,6 +80,8 @@ public final class StreamingWorkerStatusPages {
private final DebugCapture.@Nullable Manager debugCapture;
private final @Nullable ChannelzServlet channelzServlet;
+ private final AtomicReference<StreamingGlobalConfig> globalConfig = new
AtomicReference<>();
+
StreamingWorkerStatusPages(
Supplier<Instant> clock,
long clientId,
@@ -90,7 +95,8 @@ public final class StreamingWorkerStatusPages {
@Nullable GrpcWindmillStreamFactory windmillStreamFactory,
Consumer<PrintWriter> getDataStatusProvider,
BoundedQueueExecutor workUnitExecutor,
- ScheduledExecutorService statusPageDumper) {
+ ScheduledExecutorService statusPageDumper,
+ StreamingGlobalConfigHandle globalConfigHandle) {
this.clock = clock;
this.clientId = clientId;
this.isRunning = isRunning;
@@ -104,6 +110,7 @@ public final class StreamingWorkerStatusPages {
this.getDataStatusProvider = getDataStatusProvider;
this.workUnitExecutor = workUnitExecutor;
this.statusPageDumper = statusPageDumper;
+ globalConfigHandle.registerConfigObserver(globalConfig::set);
}
public static StreamingWorkerStatusPages.Builder builder() {
@@ -150,6 +157,17 @@ public final class StreamingWorkerStatusPages {
statusPages.addCapturePage(Preconditions.checkNotNull(channelzServlet));
statusPages.addStatusDataProvider(
"streaming", "Streaming RPCs",
Preconditions.checkNotNull(windmillStreamFactory));
+ statusPages.addStatusDataProvider(
+ "jobSettings",
+ "User Worker Job Settings",
+ writer -> {
+ @Nullable StreamingGlobalConfig config = globalConfig.get();
+ if (config == null) {
+ writer.println("Job Settings not loaded.");
+ return;
+ }
+ writer.println(config.userWorkerJobSettings().toString());
+ });
}
private boolean isStreamingEngine() {
@@ -256,6 +274,8 @@ public final class StreamingWorkerStatusPages {
Builder setStatusPageDumper(ScheduledExecutorService statusPageDumper);
+ Builder setGlobalConfigHandle(StreamingGlobalConfigHandle
globalConfigHandle);
+
StreamingWorkerStatusPages build();
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index cf2e7260592..412608ea398 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
/** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
@ThreadSafe
public class GrpcDispatcherClient {
+
private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
private final WindmillStubFactory windmillStubFactory;
private final CountDownLatch onInitializedEndpoints;
@@ -146,6 +148,14 @@ public class GrpcDispatcherClient {
return dispatcherStubs.get().hasInitializedEndpoints();
}
+ public void onJobConfig(StreamingGlobalConfig config) {
+ if (config.windmillServiceEndpoints().isEmpty()) {
+ LOG.warn("Dispatcher client received empty windmill service endpoints
from global config");
+ return;
+ }
+ consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
+ }
+
public synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 20c1247b216..d5e0b3a24e2 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -47,6 +47,7 @@ import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import
org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor;
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
@@ -94,6 +95,7 @@ final class ComputationWorkExecutorFactory {
private final long maxSinkBytes;
private final IdGenerator idGenerator;
+ private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
ComputationWorkExecutorFactory(
@@ -103,12 +105,14 @@ final class ComputationWorkExecutorFactory {
Function<String, WindmillStateCache.ForComputation> stateCacheFactory,
DataflowExecutionStateSampler sampler,
CounterSet pendingDeltaCounters,
- IdGenerator idGenerator) {
+ IdGenerator idGenerator,
+ StreamingGlobalConfigHandle globalConfigHandle) {
this.options = options;
this.mapTaskExecutorFactory = mapTaskExecutorFactory;
this.readerCache = readerCache;
this.stateCacheFactory = stateCacheFactory;
this.idGenerator = idGenerator;
+ this.globalConfigHandle = globalConfigHandle;
this.readerRegistry = ReaderRegistry.defaultRegistry();
this.sinkRegistry = SinkRegistry.defaultRegistry();
this.sampler = sampler;
@@ -262,6 +266,7 @@ final class ComputationWorkExecutorFactory {
stageInfo.metricsContainerRegistry(),
executionStateTracker,
stageInfo.executionStateRegistry(),
+ globalConfigHandle,
maxSinkBytes,
throwExceptionOnLargeOutput);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 86f2cffe604..641fd119a42 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
@@ -33,7 +32,6 @@ import
org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
-import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.ReaderCache;
import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
import
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -44,6 +42,7 @@ import
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeExcept
import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory;
@@ -85,7 +84,7 @@ public final class StreamingWorkScheduler {
private final HotKeyLogger hotKeyLogger;
private final ConcurrentMap<String, StageInfo> stageInfoMap;
private final DataflowExecutionStateSampler sampler;
- private final AtomicReference<OperationalLimits> operationalLimits;
+ private final StreamingGlobalConfigHandle globalConfigHandle;
public StreamingWorkScheduler(
DataflowWorkerHarnessOptions options,
@@ -99,7 +98,7 @@ public final class StreamingWorkScheduler {
HotKeyLogger hotKeyLogger,
ConcurrentMap<String, StageInfo> stageInfoMap,
DataflowExecutionStateSampler sampler,
- AtomicReference<OperationalLimits> operationalLimits) {
+ StreamingGlobalConfigHandle globalConfigHandle) {
this.options = options;
this.clock = clock;
this.computationWorkExecutorFactory = computationWorkExecutorFactory;
@@ -111,7 +110,7 @@ public final class StreamingWorkScheduler {
this.hotKeyLogger = hotKeyLogger;
this.stageInfoMap = stageInfoMap;
this.sampler = sampler;
- this.operationalLimits = operationalLimits;
+ this.globalConfigHandle = globalConfigHandle;
}
public static StreamingWorkScheduler create(
@@ -126,8 +125,8 @@ public final class StreamingWorkScheduler {
StreamingCounters streamingCounters,
HotKeyLogger hotKeyLogger,
DataflowExecutionStateSampler sampler,
- AtomicReference<OperationalLimits> operationalLimits,
IdGenerator idGenerator,
+ StreamingGlobalConfigHandle globalConfigHandle,
ConcurrentMap<String, StageInfo> stageInfoMap) {
ComputationWorkExecutorFactory computationWorkExecutorFactory =
new ComputationWorkExecutorFactory(
@@ -137,7 +136,8 @@ public final class StreamingWorkScheduler {
stateCacheFactory,
sampler,
streamingCounters.pendingDeltaCounters(),
- idGenerator);
+ idGenerator,
+ globalConfigHandle);
return new StreamingWorkScheduler(
options,
@@ -151,7 +151,7 @@ public final class StreamingWorkScheduler {
hotKeyLogger,
stageInfoMap,
sampler,
- operationalLimits);
+ globalConfigHandle);
}
private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
@@ -295,7 +295,7 @@ public final class StreamingWorkScheduler {
Windmill.WorkItemCommitRequest commitRequest,
String computationId,
Windmill.WorkItem workItem) {
- long byteLimit = operationalLimits.get().maxWorkItemCommitBytes;
+ long byteLimit =
globalConfigHandle.getConfig().operationalLimits().getMaxWorkItemCommitBytes();
int commitSize = commitRequest.getSerializedSize();
int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;
@@ -380,12 +380,7 @@ public final class StreamingWorkScheduler {
// Blocks while executing work.
computationWorkExecutor.executeWork(
- executionKey,
- work,
- stateReader,
- localSideInputStateFetcher,
- operationalLimits.get(),
- outputBuilder);
+ executionKey, work, stateReader, localSideInputStateFetcher,
outputBuilder);
if (work.isFailed()) {
throw new WorkItemCancelledException(workItem.getShardingKey());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index b41ad391d87..dadf0217123 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -102,6 +102,8 @@ import
org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
import
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -275,6 +277,8 @@ public class StreamingDataflowWorkerTest {
@Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC();
@Rule public ErrorCollector errorCollector = new ErrorCollector();
WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class);
+ StreamingGlobalConfigHandleImpl mockGlobalConfigHandle =
+ mock(StreamingGlobalConfigHandleImpl.class);
HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class);
private @Nullable ComputationStateCache computationStateCache = null;
@@ -750,7 +754,9 @@ public class StreamingDataflowWorkerTest {
requestBuilder.append("cache_token: ");
requestBuilder.append(index + 1);
requestBuilder.append(" ");
- if (hasSourceBytesProcessed)
requestBuilder.append("source_bytes_processed: 0 ");
+ if (hasSourceBytesProcessed) {
+ requestBuilder.append("source_bytes_processed: 0 ");
+ }
return requestBuilder;
}
@@ -834,6 +840,8 @@ public class StreamingDataflowWorkerTest {
private StreamingDataflowWorker makeWorker(
StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
+ when(mockGlobalConfigHandle.getConfig())
+ .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig());
StreamingDataflowWorker worker =
StreamingDataflowWorker.forTesting(
streamingDataflowWorkerTestParams.stateNameMappings(),
@@ -847,8 +855,8 @@ public class StreamingDataflowWorkerTest {
hotKeyLogger,
streamingDataflowWorkerTestParams.clock(),
streamingDataflowWorkerTestParams.executorSupplier(),
- streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
- streamingDataflowWorkerTestParams.operationalLimits());
+ mockGlobalConfigHandle,
+ streamingDataflowWorkerTestParams.localRetryTimeoutMs());
this.computationStateCache = worker.getComputationStateCache();
return worker;
}
@@ -1210,8 +1218,11 @@ public class StreamingDataflowWorkerTest {
makeWorker(
defaultWorkerParams()
.setInstructions(instructions)
- .setOperationalLimits(
-
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
+ .setStreamingGlobalConfig(
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
+ .build())
.publishCounters()
.build());
worker.start();
@@ -1282,7 +1293,11 @@ public class StreamingDataflowWorkerTest {
makeWorker(
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
-
.setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
+ .setStreamingGlobalConfig(
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+
OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
+ .build())
.build());
worker.start();
@@ -1315,8 +1330,11 @@ public class StreamingDataflowWorkerTest {
makeWorker(
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
.setInstructions(instructions)
- .setOperationalLimits(
-
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
+ .setStreamingGlobalConfig(
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
+ .build())
.build());
worker.start();
@@ -4412,7 +4430,9 @@ public class StreamingDataflowWorkerTest {
}
boolean isActiveWorkRefresh(GetDataRequest request) {
- if (request.getComputationHeartbeatRequestCount() > 0) return true;
+ if (request.getComputationHeartbeatRequestCount() > 0) {
+ return true;
+ }
for (ComputationGetDataRequest computationRequest :
request.getRequestsList()) {
if
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
return false;
@@ -4508,7 +4528,7 @@ public class StreamingDataflowWorkerTest {
.setLocalRetryTimeoutMs(-1)
.setPublishCounters(false)
.setClock(Instant::now)
- .setOperationalLimits(OperationalLimits.builder().build());
+ .setStreamingGlobalConfig(StreamingGlobalConfig.builder().build());
}
abstract ImmutableMap<String, String> stateNameMappings();
@@ -4525,10 +4545,11 @@ public class StreamingDataflowWorkerTest {
abstract int localRetryTimeoutMs();
- abstract OperationalLimits operationalLimits();
+ abstract StreamingGlobalConfig streamingGlobalConfig();
@AutoValue.Builder
abstract static class Builder {
+
abstract Builder setStateNameMappings(ImmutableMap<String, String>
value);
abstract ImmutableMap.Builder<String, String> stateNameMappingsBuilder();
@@ -4559,7 +4580,7 @@ public class StreamingDataflowWorkerTest {
abstract Builder setLocalRetryTimeoutMs(int value);
- abstract Builder setOperationalLimits(OperationalLimits
operationalLimits);
+ abstract Builder setStreamingGlobalConfig(StreamingGlobalConfig config);
abstract StreamingDataflowWorkerTestParams build();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 86ed8f552d1..a1d4210f3db 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -59,6 +59,9 @@ import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfi
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
@@ -107,6 +110,8 @@ public class StreamingModeExecutionContextTest {
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
+ StreamingGlobalConfigHandle globalConfigHandle =
+ new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(),
"testStateFamily");
executionContext =
new StreamingModeExecutionContext(
@@ -127,6 +132,7 @@ public class StreamingModeExecutionContextTest {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
+ globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
}
@@ -158,7 +164,6 @@ public class StreamingModeExecutionContextTest {
Watermarks.builder().setInputDataWatermark(new
Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
- OperationalLimits.builder().build(),
outputBuilder);
TimerInternals timerInternals = stepContext.timerInternals();
@@ -208,7 +213,6 @@ public class StreamingModeExecutionContextTest {
Watermarks.builder().setInputDataWatermark(new
Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
- OperationalLimits.builder().build(),
outputBuilder);
TimerInternals timerInternals = stepContext.timerInternals();
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index f2e03b453fd..8ad73a5145b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -90,6 +90,9 @@ import
org.apache.beam.runners.dataflow.worker.counters.NameContext;
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
@@ -594,6 +597,8 @@ public class WorkerCustomSourcesTest {
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry();
ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1),
Runnable::run);
+ StreamingGlobalConfigHandle globalConfigHandle =
+ new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
counterSet,
@@ -610,6 +615,7 @@ public class WorkerCustomSourcesTest {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
+ globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
@@ -635,7 +641,6 @@ public class WorkerCustomSourcesTest {
Watermarks.builder().setInputDataWatermark(new
Instant(0)).build()),
mock(WindmillStateReader.class),
mock(SideInputStateFetcher.class),
- OperationalLimits.builder().build(),
Windmill.WorkItemCommitRequest.newBuilder());
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -960,6 +965,8 @@ public class WorkerCustomSourcesTest {
CounterSet counterSet = new CounterSet();
StreamingModeExecutionStateRegistry executionStateRegistry =
new StreamingModeExecutionStateRegistry();
+ StreamingGlobalConfigHandle globalConfigHandle =
+ new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
StreamingModeExecutionContext context =
new StreamingModeExecutionContext(
counterSet,
@@ -979,6 +986,7 @@ public class WorkerCustomSourcesTest {
PipelineOptionsFactory.create(),
"test-work-item-id"),
executionStateRegistry,
+ globalConfigHandle,
Long.MAX_VALUE,
/*throwExceptionOnLargeOutput=*/ false);
@@ -1012,7 +1020,6 @@ public class WorkerCustomSourcesTest {
dummyWork,
mock(WindmillStateReader.class),
mock(SideInputStateFetcher.class),
- OperationalLimits.builder().build(),
Windmill.WorkItemCommitRequest.newBuilder());
@SuppressWarnings({"unchecked", "rawtypes"})
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
new file mode 100644
index 00000000000..b5cb85a58c1
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class FixedGlobalConfigHandleTest {
+
+ @Test
+ public void getConfig() {
+ StreamingGlobalConfig config =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ FixedGlobalConfigHandle globalConfigHandle = new
FixedGlobalConfigHandle(config);
+ assertEquals(config, globalConfigHandle.getConfig());
+ }
+
+ @Test
+ public void registerConfigObserver() throws InterruptedException {
+ StreamingGlobalConfig config =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ FixedGlobalConfigHandle globalConfigHandle = new
FixedGlobalConfigHandle(config);
+ AtomicReference<StreamingGlobalConfig> configFromCallback = new
AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
+ globalConfigHandle.registerConfigObserver(
+ cbConfig -> {
+ configFromCallback.set(cbConfig);
+ latch.countDown();
+ });
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configFromCallback.get(), globalConfigHandle.getConfig());
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
index f39c98c61b1..2586ae2be86 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
@@ -137,6 +137,8 @@ public class StreamingApplianceComputationConfigFetcherTest
{
}
private StreamingApplianceComputationConfigFetcher
createStreamingApplianceConfigLoader() {
- return new
StreamingApplianceComputationConfigFetcher(mockWindmillServer::getConfig);
+ return new StreamingApplianceComputationConfigFetcher(
+ mockWindmillServer::getConfig,
+ new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 59fd092adcb..9fa17588c94 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -34,7 +34,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -47,6 +47,7 @@ import org.mockito.internal.stubbing.answers.Returns;
@RunWith(JUnit4.class)
public class StreamingEngineComputationConfigFetcherTest {
+
private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class, new Returns(Optional.empty()));
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
@@ -54,13 +55,13 @@ public class StreamingEngineComputationConfigFetcherTest {
private StreamingEngineComputationConfigFetcher createConfigFetcher(
boolean waitForInitialConfig,
long globalConfigRefreshPeriod,
- Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+ StreamingGlobalConfigHandleImpl globalConfigHandle) {
return StreamingEngineComputationConfigFetcher.forTesting(
!waitForInitialConfig,
globalConfigRefreshPeriod,
mockDataflowServiceClient,
- ignored -> Executors.newSingleThreadScheduledExecutor(),
- onPipelineConfig);
+ globalConfigHandle,
+ ignored -> Executors.newSingleThreadScheduledExecutor());
}
@After
@@ -75,31 +76,33 @@ public class StreamingEngineComputationConfigFetcherTest {
.setJobId("job")
.setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
CountDownLatch waitForInitialConfig = new CountDownLatch(1);
- Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
.thenReturn(Optional.of(initialConfig));
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ try {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
streamingEngineConfigFetcher =
- createConfigFetcher(
- /* waitForInitialConfig= */ true,
- 0,
- config -> {
- try {
- receivedPipelineConfig.add(config);
- waitForInitialConfig.await();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
+ createConfigFetcher(/* waitForInitialConfig= */ true, 0,
globalConfigHandle);
Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
asyncStartConfigLoader.start();
waitForInitialConfig.countDown();
asyncStartConfigLoader.join();
- assertThat(receivedPipelineConfig)
- .containsExactly(
- StreamingEnginePipelineConfig.builder()
- .setMaxWorkItemCommitBytes(
-
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
- .build());
+ StreamingGlobalConfig.Builder configBuilder =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxWorkItemCommitBytes(
+
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build());
+ assertThat(receivedPipelineConfig).containsExactly(configBuilder.build());
}
@Test
@@ -117,7 +120,7 @@ public class StreamingEngineComputationConfigFetcherTest {
.setJobId("job")
.setStreamingConfigTask(new
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
- Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
.thenReturn(Optional.of(firstConfig))
.thenReturn(Optional.of(secondConfig))
@@ -127,15 +130,15 @@ public class StreamingEngineComputationConfigFetcherTest {
// ConfigFetcher should not do anything with a config that doesn't
contain a
// StreamingConfigTask.
.thenReturn(Optional.of(new WorkItem().setJobId("jobId")));
-
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ receivedPipelineConfig.add(config);
+ numExpectedRefreshes.countDown();
+ });
streamingEngineConfigFetcher =
createConfigFetcher(
- /* waitForInitialConfig= */ true,
- Duration.millis(100).getMillis(),
- config -> {
- receivedPipelineConfig.add(config);
- numExpectedRefreshes.countDown();
- });
+ /* waitForInitialConfig= */ true,
Duration.millis(100).getMillis(), globalConfigHandle);
Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
asyncStartConfigLoader.start();
@@ -143,24 +146,34 @@ public class StreamingEngineComputationConfigFetcherTest {
asyncStartConfigLoader.join();
assertThat(receivedPipelineConfig)
.containsExactly(
- StreamingEnginePipelineConfig.builder()
- .setMaxWorkItemCommitBytes(
-
firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxWorkItemCommitBytes(
+
firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build())
.build(),
- StreamingEnginePipelineConfig.builder()
- .setMaxWorkItemCommitBytes(
-
secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxWorkItemCommitBytes(
+
secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build())
.build(),
- StreamingEnginePipelineConfig.builder()
- .setMaxWorkItemCommitBytes(
-
thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxWorkItemCommitBytes(
+
thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+ .build())
.build());
}
@Test
public void testGetComputationConfig() throws IOException {
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
streamingEngineConfigFetcher =
- createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored ->
{});
+ createConfigFetcher(/* waitForInitialConfig= */ false, 0,
globalConfigHandle);
String computationId = "computationId";
String stageName = "stageName";
String systemName = "systemName";
@@ -193,9 +206,11 @@ public class StreamingEngineComputationConfigFetcherTest {
@Test
public void testGetComputationConfig_noComputationPresent() throws
IOException {
- Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new
HashSet<>();
+ Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ globalConfigHandle.registerConfigObserver(receivedPipelineConfig::add);
streamingEngineConfigFetcher =
- createConfigFetcher(/* waitForInitialConfig= */ false, 0,
receivedPipelineConfig::add);
+ createConfigFetcher(/* waitForInitialConfig= */ false, 0,
globalConfigHandle);
when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString()))
.thenReturn(Optional.empty());
Optional<ComputationConfig> pipelineConfig =
@@ -206,8 +221,9 @@ public class StreamingEngineComputationConfigFetcherTest {
@Test
public void testGetComputationConfig_fetchConfigFromDataflowError() throws
IOException {
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
streamingEngineConfigFetcher =
- createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored ->
{});
+ createConfigFetcher(/* waitForInitialConfig= */ false, 0,
globalConfigHandle);
RuntimeException e = new RuntimeException("something bad happened.");
when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())).thenThrow(e);
Throwable fetchConfigError =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
new file mode 100644
index 00000000000..059f60731a7
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingGlobalConfigHandleImplTest {
+
+ @Test
+ public void getConfig() {
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig config =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ globalConfigHandle.setConfig(config);
+ assertEquals(config, globalConfigHandle.getConfig());
+
+ StreamingGlobalConfig updatedConfig =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(324)
+ .setMaxOutputKeyBytes(456)
+ .setMaxWorkItemCommitBytes(123)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(true)
+ .build())
+ .build();
+ globalConfigHandle.setConfig(updatedConfig);
+ assertEquals(updatedConfig, globalConfigHandle.getConfig());
+ }
+
+ @Test
+ public void registerConfigObserver_configSetAfterRegisteringCallback()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig configToSet =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ AtomicReference<StreamingGlobalConfig> configFromCallback1 = new
AtomicReference<>();
+ AtomicReference<StreamingGlobalConfig> configFromCallback2 = new
AtomicReference<>();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback1.set(config);
+ latch.countDown();
+ });
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback2.set(config);
+ latch.countDown();
+ });
+ globalConfigHandle.setConfig(configToSet);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig());
+ assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig());
+ }
+
+ @Test
+ public void registerConfigObserver_configSetBeforeRegisteringCallback()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig configToSet =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ AtomicReference<StreamingGlobalConfig> configFromCallback1 = new
AtomicReference<>();
+ AtomicReference<StreamingGlobalConfig> configFromCallback2 = new
AtomicReference<>();
+ globalConfigHandle.setConfig(configToSet);
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback1.set(config);
+ latch.countDown();
+ });
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback2.set(config);
+ latch.countDown();
+ });
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig());
+ assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig());
+ }
+
+ @Test
+ public void
registerConfigObserver_configSetBeforeRegisteringCallback_callbackThrowsException()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig configToSet =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ AtomicReference<StreamingGlobalConfig> configFromCallback = new
AtomicReference<>();
+ globalConfigHandle.setConfig(configToSet);
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ latch.countDown();
+ throw new RuntimeException();
+ });
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback.set(config);
+ latch.countDown();
+ });
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configFromCallback.get(), configToSet);
+ }
+
+ @Test
+ public void
registerConfigObserver_configSetAfterRegisteringCallback_callbackThrowsException()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig configToSet =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ AtomicReference<StreamingGlobalConfig> configFromCallback = new
AtomicReference<>();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ latch.countDown();
+ throw new RuntimeException();
+ });
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configFromCallback.set(config);
+ latch.countDown();
+ });
+ globalConfigHandle.setConfig(configToSet);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configFromCallback.get(), configToSet);
+ }
+
+ @Test
+ public void
registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger callbackCount = new AtomicInteger(0);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ Supplier<StreamingGlobalConfig> configToSet =
+ () ->
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ callbackCount.incrementAndGet();
+ latch.countDown();
+ });
+ globalConfigHandle.setConfig(configToSet.get());
+ // call setter again with same config
+ globalConfigHandle.setConfig(configToSet.get());
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ assertEquals(1, callbackCount.get());
+ }
+
+ @Test
+ public void registerConfigObserver_updateConfigWhenCallbackIsRunning()
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(2);
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ StreamingGlobalConfig initialConfig =
+ StreamingGlobalConfig.builder()
+
.setOperationalLimits(OperationalLimits.builder().setMaxOutputValueBytes(4569).build())
+ .build();
+ StreamingGlobalConfig updatedConfig =
+ StreamingGlobalConfig.builder()
+ .setOperationalLimits(
+ OperationalLimits.builder()
+ .setMaxOutputValueBytes(123)
+ .setMaxOutputKeyBytes(324)
+ .setMaxWorkItemCommitBytes(456)
+ .build())
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseSeparateWindmillHeartbeatStreams(false)
+ .build())
+ .build();
+ CopyOnWriteArrayList<StreamingGlobalConfig> configsFromCallback = new
CopyOnWriteArrayList<>();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ configsFromCallback.add(config);
+ if (config.equals(initialConfig)) {
+ globalConfigHandle.setConfig(updatedConfig);
+ }
+ latch.countDown();
+ });
+ globalConfigHandle.setConfig(initialConfig);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(configsFromCallback.get(0), initialConfig);
+ assertEquals(configsFromCallback.get(1), updatedConfig);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 4677ff9dcc9..3b3348dbc3f 100644
---
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -923,6 +923,15 @@ message WorkerMetadataResponse {
reserved 4;
}
+// Settings to control runtime behavior of the java runner v1 user worker.
+message UserWorkerRunnerV1Settings {
+ // If true, use separate channels for each windmill RPC.
+ optional bool use_windmill_isolated_channels = 1 [default = true];
+
+ // If true, use separate streaming RPC for windmill heartbeats and state
reads.
+ optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
+}
+
service WindmillAppliance {
// Gets streaming Dataflow work.
rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);