This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 019efe131f9 [Dataflow Streaming] Add support to read user worker 
settings from backend (#32408)
019efe131f9 is described below

commit 019efe131f93ad7b56007fca479da003cfe6e2a9
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Sep 18 04:14:56 2024 -0700

    [Dataflow Streaming] Add support to read user worker settings from backend 
(#32408)
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   2 +-
 .../runners/dataflow/worker/OperationalLimits.java |  39 +--
 .../dataflow/worker/StreamingDataflowWorker.java   |  75 ++----
 .../worker/StreamingModeExecutionContext.java      |  12 +-
 .../worker/streaming/ComputationWorkExecutor.java  |   5 +-
 .../worker/streaming/config/ComputationConfig.java |   3 +-
 .../streaming/config/FixedGlobalConfigHandle.java  |  48 ++++
 ...StreamingApplianceComputationConfigFetcher.java |  10 +-
 .../StreamingEngineComputationConfigFetcher.java   |  69 +++--
 ...elineConfig.java => StreamingGlobalConfig.java} |  41 ++-
 .../config/StreamingGlobalConfigHandle.java        |  37 +++
 .../config/StreamingGlobalConfigHandleImpl.java    | 113 ++++++++
 .../harness/StreamingWorkerStatusPages.java        |  22 +-
 .../windmill/client/grpc/GrpcDispatcherClient.java |  10 +
 .../processing/ComputationWorkExecutorFactory.java |   7 +-
 .../work/processing/StreamingWorkScheduler.java    |  25 +-
 .../worker/StreamingDataflowWorkerTest.java        |  45 +++-
 .../worker/StreamingModeExecutionContextTest.java  |   8 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  11 +-
 .../config/FixedGlobalConfigHandleTest.java        |  84 ++++++
 ...amingApplianceComputationConfigFetcherTest.java |   4 +-
 ...treamingEngineComputationConfigFetcherTest.java | 102 ++++---
 .../StreamingGlobalConfigHandleImplTest.java       | 293 +++++++++++++++++++++
 .../worker/windmill/src/main/proto/windmill.proto  |   9 +
 24 files changed, 867 insertions(+), 207 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 0110f71974e..4aec3a796f2 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -738,7 +738,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_api_common                           : 
"com.google.api:api-common", // google_cloud_platform_libraries_bom sets version
         google_api_services_bigquery                : 
"com.google.apis:google-api-services-bigquery:v2-rev20240815-2.0.0",  // 
[bomupgrader] sets version
         google_api_services_cloudresourcemanager    : 
"com.google.apis:google-api-services-cloudresourcemanager:v1-rev20240310-2.0.0",
  // [bomupgrader] sets version
-        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20240624-$google_clients_version",
+        google_api_services_dataflow                : 
"com.google.apis:google-api-services-dataflow:v1b3-rev20240817-$google_clients_version",
         google_api_services_healthcare              : 
"com.google.apis:google-api-services-healthcare:v1-rev20240130-$google_clients_version",
         google_api_services_pubsub                  : 
"com.google.apis:google-api-services-pubsub:v1-rev20220904-$google_clients_version",
         google_api_services_storage                 : 
"com.google.apis:google-api-services-storage:v1-rev20240706-2.0.0",  // 
[bomupgrader] sets version
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
index 47e36e49850..84f41c473fe 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java
@@ -17,37 +17,38 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import com.google.auto.value.AutoBuilder;
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Internal;
 
 /** Keep track of any operational limits required by the backend. */
-public class OperationalLimits {
+@AutoValue
+@Internal
+public abstract class OperationalLimits {
+
+  private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
+
   // Maximum size of a commit from a single work item.
-  public final long maxWorkItemCommitBytes;
+  public abstract long getMaxWorkItemCommitBytes();
   // Maximum size of a single output element's serialized key.
-  public final long maxOutputKeyBytes;
+  public abstract long getMaxOutputKeyBytes();
   // Maximum size of a single output element's serialized value.
-  public final long maxOutputValueBytes;
+  public abstract long getMaxOutputValueBytes();
 
-  OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long 
maxOutputValueBytes) {
-    this.maxWorkItemCommitBytes = maxWorkItemCommitBytes;
-    this.maxOutputKeyBytes = maxOutputKeyBytes;
-    this.maxOutputValueBytes = maxOutputValueBytes;
-  }
+  @AutoValue.Builder
+  public abstract static class Builder {
 
-  @AutoBuilder(ofClass = OperationalLimits.class)
-  public interface Builder {
-    Builder setMaxWorkItemCommitBytes(long bytes);
+    public abstract Builder setMaxWorkItemCommitBytes(long bytes);
 
-    Builder setMaxOutputKeyBytes(long bytes);
+    public abstract Builder setMaxOutputKeyBytes(long bytes);
 
-    Builder setMaxOutputValueBytes(long bytes);
+    public abstract Builder setMaxOutputValueBytes(long bytes);
 
-    OperationalLimits build();
+    public abstract OperationalLimits build();
   }
 
-  public static Builder builder() {
-    return new AutoBuilder_OperationalLimits_Builder()
-        .setMaxWorkItemCommitBytes(Long.MAX_VALUE)
+  public static OperationalLimits.Builder builder() {
+    return new AutoValue_OperationalLimits.Builder()
+        .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
         .setMaxOutputKeyBytes(Long.MAX_VALUE)
         .setMaxOutputValueBytes(Long.MAX_VALUE);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 1af67738209..0dedd4f34fd 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -34,8 +34,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.runners.core.metrics.MetricsLogger;
@@ -49,9 +47,11 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import 
org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
-import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEnginePipelineConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
@@ -103,9 +103,7 @@ import 
org.apache.beam.sdk.util.construction.CoderTranslation;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -181,7 +179,6 @@ public final class StreamingDataflowWorker {
       WorkFailureProcessor workFailureProcessor,
       StreamingCounters streamingCounters,
       MemoryMonitor memoryMonitor,
-      AtomicReference<OperationalLimits> operationalLimits,
       GrpcWindmillStreamFactory windmillStreamFactory,
       Function<String, ScheduledExecutorService> executorSupplier,
       ConcurrentMap<String, StageInfo> stageInfoMap) {
@@ -237,8 +234,8 @@ public final class StreamingDataflowWorker {
             streamingCounters,
             hotKeyLogger,
             sampler,
-            operationalLimits,
             ID_GENERATOR,
+            configFetcher.getGlobalConfigHandle(),
             stageInfoMap);
 
     ThrottlingGetDataMetricTracker getDataMetricTracker =
@@ -298,6 +295,7 @@ public final class StreamingDataflowWorker {
             
.setCurrentActiveCommitBytes(workCommitter::currentActiveCommitBytes)
             .setGetDataStatusProvider(getDataClient::printHtml)
             .setWorkUnitExecutor(workUnitExecutor)
+            .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
             .build();
 
     Windmill.GetWorkRequest request =
@@ -335,8 +333,6 @@ public final class StreamingDataflowWorker {
     StreamingCounters streamingCounters = StreamingCounters.create();
     WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, 
LOG);
     BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
-    AtomicReference<OperationalLimits> operationalLimits =
-        new AtomicReference<>(OperationalLimits.builder().build());
     WindmillStateCache windmillStateCache =
         WindmillStateCache.builder()
             .setSizeMb(options.getWorkerCacheMb())
@@ -354,7 +350,6 @@ public final class StreamingDataflowWorker {
             createConfigFetcherComputationStateCacheAndWindmillClient(
                 options,
                 dataflowServiceClient,
-                operationalLimits,
                 windmillStreamFactoryBuilder,
                 configFetcher ->
                     ComputationStateCache.create(
@@ -412,7 +407,6 @@ public final class StreamingDataflowWorker {
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        operationalLimits,
         
configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(),
         executorSupplier,
         stageInfo);
@@ -428,7 +422,6 @@ public final class StreamingDataflowWorker {
       createConfigFetcherComputationStateCacheAndWindmillClient(
           DataflowWorkerHarnessOptions options,
           WorkUnitClient dataflowServiceClient,
-          AtomicReference<OperationalLimits> operationalLimits,
           GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder,
           Function<ComputationConfig.Fetcher, ComputationStateCache> 
computationStateCacheFactory) {
     ComputationConfig.Fetcher configFetcher;
@@ -440,13 +433,8 @@ public final class StreamingDataflowWorker {
           GrpcDispatcherClient.create(createStubFactory(options));
       configFetcher =
           StreamingEngineComputationConfigFetcher.create(
-              options.getGlobalConfigRefreshPeriod().getMillis(),
-              dataflowServiceClient,
-              config ->
-                  onPipelineConfig(
-                      config,
-                      dispatcherClient::consumeWindmillDispatcherEndpoints,
-                      operationalLimits::set));
+              options.getGlobalConfigRefreshPeriod().getMillis(), 
dataflowServiceClient);
+      
configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig);
       computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
       windmillStreamFactory =
           windmillStreamFactoryBuilder
@@ -474,7 +462,10 @@ public final class StreamingDataflowWorker {
         windmillServer = new 
JniWindmillApplianceServer(options.getLocalWindmillHostport());
       }
 
-      configFetcher = new 
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+      configFetcher =
+          new StreamingApplianceComputationConfigFetcher(
+              windmillServer::getConfig,
+              new 
FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
       computationStateCache = 
computationStateCacheFactory.apply(configFetcher);
     }
 
@@ -494,10 +485,9 @@ public final class StreamingDataflowWorker {
       HotKeyLogger hotKeyLogger,
       Supplier<Instant> clock,
       Function<String, ScheduledExecutorService> executorSupplier,
-      int localRetryTimeoutMs,
-      OperationalLimits limits) {
+      StreamingGlobalConfigHandleImpl globalConfigHandle,
+      int localRetryTimeoutMs) {
     ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
-    AtomicReference<OperationalLimits> operationalLimits = new 
AtomicReference<>(limits);
     BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
     WindmillStateCache stateCache =
         WindmillStateCache.builder()
@@ -510,13 +500,20 @@ public final class StreamingDataflowWorker {
                 /* hasReceivedGlobalConfig= */ true,
                 options.getGlobalConfigRefreshPeriod().getMillis(),
                 workUnitClient,
-                executorSupplier,
-                config ->
-                    onPipelineConfig(
-                        config,
-                        windmillServer::setWindmillServiceEndpoints,
-                        operationalLimits::set))
-            : new 
StreamingApplianceComputationConfigFetcher(windmillServer::getConfig);
+                globalConfigHandle,
+                executorSupplier)
+            : new StreamingApplianceComputationConfigFetcher(
+                windmillServer::getConfig, globalConfigHandle);
+    configFetcher
+        .getGlobalConfigHandle()
+        .registerConfigObserver(
+            config -> {
+              if (config.windmillServiceEndpoints().isEmpty()) {
+                LOG.warn("Received empty windmill service endpoints");
+                return;
+              }
+              
windmillServer.setWindmillServiceEndpoints(config.windmillServiceEndpoints());
+            });
     ConcurrentMap<String, String> stateNameMap =
         new ConcurrentHashMap<>(prePopulatedStateNameMappings);
     ComputationStateCache computationStateCache =
@@ -583,7 +580,6 @@ public final class StreamingDataflowWorker {
         workFailureProcessor,
         streamingCounters,
         memoryMonitor,
-        operationalLimits,
         options.isEnableStreamingEngine()
             ? windmillStreamFactory
                 .setHealthCheckIntervalMillis(
@@ -594,23 +590,6 @@ public final class StreamingDataflowWorker {
         stageInfo);
   }
 
-  private static void onPipelineConfig(
-      StreamingEnginePipelineConfig config,
-      Consumer<ImmutableSet<HostAndPort>> consumeWindmillServiceEndpoints,
-      Consumer<OperationalLimits> operationalLimits) {
-
-    operationalLimits.accept(
-        OperationalLimits.builder()
-            .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes())
-            .setMaxOutputKeyBytes(config.maxOutputKeyBytes())
-            .setMaxOutputValueBytes(config.maxOutputValueBytes())
-            .build());
-
-    if (!config.windmillServiceEndpoints().isEmpty()) {
-      
consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints());
-    }
-  }
-
   private static GrpcWindmillStreamFactory.Builder 
createGrpcwindmillStreamFactoryBuilder(
       DataflowWorkerHarnessOptions options, long clientId) {
     Duration maxBackoff =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index f25f6294da8..5ff94884e97 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -50,6 +50,7 @@ import 
org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
@@ -107,6 +108,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
   private final ImmutableMap<String, String> stateNameMap;
   private final WindmillStateCache.ForComputation stateCache;
   private final ReaderCache readerCache;
+  private final StreamingGlobalConfigHandle globalConfigHandle;
   private final boolean throwExceptionOnLargeOutput;
   private volatile long backlogBytes;
 
@@ -153,6 +155,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       MetricsContainerRegistry<StreamingStepMetricsContainer> 
metricsContainerRegistry,
       DataflowExecutionStateTracker executionStateTracker,
       StreamingModeExecutionStateRegistry executionStateRegistry,
+      StreamingGlobalConfigHandle globalConfigHandle,
       long sinkByteLimit,
       boolean throwExceptionOnLargeOutput) {
     super(
@@ -163,6 +166,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
         sinkByteLimit);
     this.computationId = computationId;
     this.readerCache = readerCache;
+    this.globalConfigHandle = globalConfigHandle;
     this.sideInputCache = new HashMap<>();
     this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
     this.stateCache = stateCache;
@@ -176,11 +180,11 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
   }
 
   public long getMaxOutputKeyBytes() {
-    return operationalLimits.maxOutputKeyBytes;
+    return operationalLimits.getMaxOutputKeyBytes();
   }
 
   public long getMaxOutputValueBytes() {
-    return operationalLimits.maxOutputValueBytes;
+    return operationalLimits.getMaxOutputValueBytes();
   }
 
   public boolean throwExceptionsForLargeOutput() {
@@ -196,13 +200,13 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
-      OperationalLimits operationalLimits,
       Windmill.WorkItemCommitRequest.Builder outputBuilder) {
     this.key = key;
     this.work = work;
     this.computationKey = WindmillComputationKey.create(computationId, 
work.getShardedKey());
     this.sideInputStateFetcher = sideInputStateFetcher;
-    this.operationalLimits = operationalLimits;
+    // Snapshot the limits for entire bundle processing.
+    this.operationalLimits = 
globalConfigHandle.getConfig().operationalLimits();
     this.outputBuilder = outputBuilder;
     this.sideInputCache.clear();
     clearSinkFullHint();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
index 8a00194887d..8dc681fc640 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java
@@ -24,7 +24,6 @@ import javax.annotation.concurrent.NotThreadSafe;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor;
-import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
@@ -73,11 +72,9 @@ public abstract class ComputationWorkExecutor {
       Work work,
       WindmillStateReader stateReader,
       SideInputStateFetcher sideInputStateFetcher,
-      OperationalLimits operationalLimits,
       Windmill.WorkItemCommitRequest.Builder outputBuilder)
       throws Exception {
-    context()
-        .start(key, work, stateReader, sideInputStateFetcher, 
operationalLimits, outputBuilder);
+    context().start(key, work, stateReader, sideInputStateFetcher, 
outputBuilder);
     workExecutor().execute();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
index fb8bcf7edbf..9702751aeb9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/ComputationConfig.java
@@ -48,12 +48,13 @@ public abstract class ComputationConfig {
   public abstract ImmutableMap<String, String> stateNameMap();
 
   /** Interface to fetch configurations for a specific computation. */
-  @FunctionalInterface
   public interface Fetcher {
     default void start() {}
 
     default void stop() {}
 
     Optional<ComputationConfig> fetchConfig(String computationId);
+
+    StreamingGlobalConfigHandle getGlobalConfigHandle();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
new file mode 100644
index 00000000000..c244ecb8c7a
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandle.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+/*
+ *  StreamingGlobalConfigHandle returning a fixed config
+ *  initialized during construction. Used for Appliance and Tests.
+ */
+public class FixedGlobalConfigHandle implements StreamingGlobalConfigHandle {
+
+  private final StreamingGlobalConfig config;
+
+  public FixedGlobalConfigHandle(StreamingGlobalConfig config) {
+    this.config = config;
+  }
+
+  @Override
+  public StreamingGlobalConfig getConfig() {
+    return config;
+  }
+
+  @Override
+  public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> 
callback) {
+    callback.accept(config);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
index 786ded09498..025e66be79c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcher.java
@@ -48,11 +48,14 @@ public final class 
StreamingApplianceComputationConfigFetcher implements Computa
 
   private final ApplianceComputationConfigFetcher 
applianceComputationConfigFetcher;
   private final ConcurrentHashMap<String, String> systemNameToComputationIdMap;
+  private final StreamingGlobalConfigHandle globalConfigHandle;
 
   public StreamingApplianceComputationConfigFetcher(
-      ApplianceComputationConfigFetcher applianceComputationConfigFetcher) {
+      ApplianceComputationConfigFetcher applianceComputationConfigFetcher,
+      StreamingGlobalConfigHandle globalConfigHandle) {
     this.applianceComputationConfigFetcher = applianceComputationConfigFetcher;
     this.systemNameToComputationIdMap = new ConcurrentHashMap<>();
+    this.globalConfigHandle = globalConfigHandle;
   }
 
   /** Returns a {@code Table<ComputationId, TransformUserName, 
StateFamilyName>} */
@@ -112,6 +115,11 @@ public final class 
StreamingApplianceComputationConfigFetcher implements Computa
             .collect(toImmutableMap(NameMapEntry::getUserName, 
NameMapEntry::getSystemName)));
   }
 
+  @Override
+  public StreamingGlobalConfigHandle getGlobalConfigHandle() {
+    return globalConfigHandle;
+  }
+
   private Optional<ComputationConfig> createComputationConfig(
       String serializedMapTask,
       Table<String, String, String> 
transformUserNameToStateFamilyByComputationId,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index d230aac54c6..22b0dac6eb2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -30,16 +30,18 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.StreamSupport;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.BackOffUtils;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Sleeper;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.InvalidProtocolBufferException;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
@@ -72,33 +74,31 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
   private final long globalConfigRefreshPeriodMillis;
   private final WorkUnitClient dataflowServiceClient;
   private final ScheduledExecutorService globalConfigRefresher;
-  private final Consumer<StreamingEnginePipelineConfig> onStreamingConfig;
+  private final StreamingGlobalConfigHandleImpl globalConfigHandle;
   private final AtomicBoolean hasReceivedGlobalConfig;
 
   private StreamingEngineComputationConfigFetcher(
       boolean hasReceivedGlobalConfig,
       long globalConfigRefreshPeriodMillis,
       WorkUnitClient dataflowServiceClient,
-      ScheduledExecutorService globalConfigRefresher,
-      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+      StreamingGlobalConfigHandleImpl globalConfigHandle,
+      ScheduledExecutorService globalConfigRefresher) {
     this.globalConfigRefreshPeriodMillis = globalConfigRefreshPeriodMillis;
     this.dataflowServiceClient = dataflowServiceClient;
     this.globalConfigRefresher = globalConfigRefresher;
-    this.onStreamingConfig = onStreamingConfig;
+    this.globalConfigHandle = globalConfigHandle;
     this.hasReceivedGlobalConfig = new AtomicBoolean(hasReceivedGlobalConfig);
   }
 
   public static StreamingEngineComputationConfigFetcher create(
-      long globalConfigRefreshPeriodMillis,
-      WorkUnitClient dataflowServiceClient,
-      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+      long globalConfigRefreshPeriodMillis, WorkUnitClient 
dataflowServiceClient) {
     return new StreamingEngineComputationConfigFetcher(
         /* hasReceivedGlobalConfig= */ false,
         globalConfigRefreshPeriodMillis,
         dataflowServiceClient,
+        new StreamingGlobalConfigHandleImpl(),
         Executors.newSingleThreadScheduledExecutor(
-            new 
ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()),
-        onStreamingConfig);
+            new 
ThreadFactoryBuilder().setNameFormat(CONFIG_REFRESHER_THREAD_NAME).build()));
   }
 
   @VisibleForTesting
@@ -106,14 +106,14 @@ public final class 
StreamingEngineComputationConfigFetcher implements Computatio
       boolean hasReceivedGlobalConfig,
       long globalConfigRefreshPeriodMillis,
       WorkUnitClient dataflowServiceClient,
-      Function<String, ScheduledExecutorService> executorSupplier,
-      Consumer<StreamingEnginePipelineConfig> onStreamingConfig) {
+      StreamingGlobalConfigHandleImpl globalConfigHandle,
+      Function<String, ScheduledExecutorService> executorSupplier) {
     return new StreamingEngineComputationConfigFetcher(
         hasReceivedGlobalConfig,
         globalConfigRefreshPeriodMillis,
         dataflowServiceClient,
-        executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME),
-        onStreamingConfig);
+        globalConfigHandle,
+        executorSupplier.apply(CONFIG_REFRESHER_THREAD_NAME));
   }
 
   @VisibleForTesting
@@ -157,11 +157,9 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
     }
   }
 
-  private StreamingEnginePipelineConfig 
createPipelineConfig(StreamingConfigTask config) {
-    StreamingEnginePipelineConfig.Builder pipelineConfig = 
StreamingEnginePipelineConfig.builder();
-    if (config.getUserStepToStateFamilyNameMap() != null) {
-      
pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap());
-    }
+  private StreamingGlobalConfig createPipelineConfig(StreamingConfigTask 
config) {
+    StreamingGlobalConfig.Builder pipelineConfig = 
StreamingGlobalConfig.builder();
+    OperationalLimits.Builder operationalLimits = OperationalLimits.builder();
 
     if (config.getWindmillServiceEndpoint() != null
         && !config.getWindmillServiceEndpoint().isEmpty()) {
@@ -184,23 +182,36 @@ public final class 
StreamingEngineComputationConfigFetcher implements Computatio
     if (config.getMaxWorkItemCommitBytes() != null
         && config.getMaxWorkItemCommitBytes() > 0
         && config.getMaxWorkItemCommitBytes() <= Integer.MAX_VALUE) {
-      
pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
+      
operationalLimits.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue());
     }
 
     if (config.getOperationalLimits() != null) {
       if (config.getOperationalLimits().getMaxKeyBytes() != null
           && config.getOperationalLimits().getMaxKeyBytes() > 0
           && config.getOperationalLimits().getMaxKeyBytes() <= 
Integer.MAX_VALUE) {
-        
pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
+        
operationalLimits.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes());
       }
       if (config.getOperationalLimits().getMaxProductionOutputBytes() != null
           && config.getOperationalLimits().getMaxProductionOutputBytes() > 0
           && config.getOperationalLimits().getMaxProductionOutputBytes() <= 
Integer.MAX_VALUE) {
-        pipelineConfig.setMaxOutputValueBytes(
+        operationalLimits.setMaxOutputValueBytes(
             config.getOperationalLimits().getMaxProductionOutputBytes());
       }
     }
 
+    pipelineConfig.setOperationalLimits(operationalLimits.build());
+
+    byte[] settings_bytes = config.decodeUserWorkerRunnerV1Settings();
+    if (settings_bytes != null) {
+      UserWorkerRunnerV1Settings settings = 
UserWorkerRunnerV1Settings.newBuilder().build();
+      try {
+        settings = UserWorkerRunnerV1Settings.parseFrom(settings_bytes);
+      } catch (InvalidProtocolBufferException e) {
+        LOG.error("Parsing UserWorkerRunnerV1Settings failed", e);
+      }
+      pipelineConfig.setUserWorkerJobSettings(settings);
+    }
+
     return pipelineConfig.build();
   }
 
@@ -233,6 +244,11 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
         
.flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig);
   }
 
+  @Override
+  public StreamingGlobalConfigHandle getGlobalConfigHandle() {
+    return globalConfigHandle;
+  }
+
   @Override
   public void stop() {
     // We have already shutdown or start has not been called.
@@ -259,7 +275,7 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
   @SuppressWarnings("FutureReturnValueIgnored")
   private void schedulePeriodicGlobalConfigRequests() {
     globalConfigRefresher.scheduleWithFixedDelay(
-        () -> fetchGlobalConfig().ifPresent(onStreamingConfig),
+        () -> fetchGlobalConfig().ifPresent(globalConfigHandle::setConfig),
         0,
         globalConfigRefreshPeriodMillis,
         TimeUnit.MILLISECONDS);
@@ -272,9 +288,9 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
   private synchronized void fetchInitialPipelineGlobalConfig() {
     while (!hasReceivedGlobalConfig.get()) {
       LOG.info("Sending request to get initial global configuration for this 
worker.");
-      Optional<StreamingEnginePipelineConfig> globalConfig = 
fetchGlobalConfig();
+      Optional<StreamingGlobalConfig> globalConfig = fetchGlobalConfig();
       if (globalConfig.isPresent()) {
-        onStreamingConfig.accept(globalConfig.get());
+        globalConfigHandle.setConfig(globalConfig.get());
         hasReceivedGlobalConfig.set(true);
         break;
       }
@@ -285,13 +301,14 @@ public final class 
StreamingEngineComputationConfigFetcher implements Computatio
     LOG.info("Initial global configuration received, harness is now ready");
   }
 
-  private Optional<StreamingEnginePipelineConfig> fetchGlobalConfig() {
+  private Optional<StreamingGlobalConfig> fetchGlobalConfig() {
     return 
fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem)
         .map(config -> createPipelineConfig(config));
   }
 
   @FunctionalInterface
   private interface ThrowingFetchWorkItemFn {
+
     Optional<WorkItem> fetchWorkItem() throws IOException;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
similarity index 56%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
index 8f1ff93f6a4..8f76f5ec27a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
@@ -18,8 +18,8 @@
 package org.apache.beam.runners.dataflow.worker.streaming.config;
 
 import com.google.auto.value.AutoValue;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
 import org.apache.beam.sdk.annotations.Internal;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
@@ -27,41 +27,30 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPor
 /** Global pipeline config for pipelines running in Streaming Engine mode. */
 @AutoValue
 @Internal
-public abstract class StreamingEnginePipelineConfig {
+public abstract class StreamingGlobalConfig {
 
-  private static final long DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES = 180 << 20;
-
-  public static StreamingEnginePipelineConfig.Builder builder() {
-    return new AutoValue_StreamingEnginePipelineConfig.Builder()
-        .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES)
-        .setMaxOutputKeyBytes(Long.MAX_VALUE)
-        .setMaxOutputValueBytes(Long.MAX_VALUE)
-        .setUserStepToStateFamilyNameMap(new HashMap<>())
-        .setWindmillServiceEndpoints(ImmutableSet.of());
+  public static StreamingGlobalConfig.Builder builder() {
+    return new AutoValue_StreamingGlobalConfig.Builder()
+        .setWindmillServiceEndpoints(ImmutableSet.of())
+        
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
+        .setOperationalLimits(OperationalLimits.builder().build());
   }
 
-  public abstract long maxWorkItemCommitBytes();
-
-  public abstract long maxOutputKeyBytes();
-
-  public abstract long maxOutputValueBytes();
-
-  public abstract Map<String, String> userStepToStateFamilyNameMap();
+  public abstract OperationalLimits operationalLimits();
 
   public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
 
+  public abstract UserWorkerRunnerV1Settings userWorkerJobSettings();
+
   @AutoValue.Builder
   public abstract static class Builder {
-    public abstract Builder setMaxWorkItemCommitBytes(long value);
-
-    public abstract Builder setMaxOutputKeyBytes(long value);
 
-    public abstract Builder setMaxOutputValueBytes(long value);
+    public abstract Builder 
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
 
-    public abstract Builder setUserStepToStateFamilyNameMap(Map<String, 
String> value);
+    public abstract Builder setOperationalLimits(OperationalLimits 
operationalLimits);
 
-    public abstract Builder 
setWindmillServiceEndpoints(ImmutableSet<HostAndPort> value);
+    public abstract Builder 
setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);
 
-    public abstract StreamingEnginePipelineConfig build();
+    public abstract StreamingGlobalConfig build();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
new file mode 100644
index 00000000000..6f75ba88747
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+public interface StreamingGlobalConfigHandle {
+
+  /** Returns the latest StreamingGlobalConfig */
+  StreamingGlobalConfig getConfig();
+
+  /**
+   * Subscribe to config updates by registering a callback. Callback should be 
called the first time
+   * with settings, if any. The callback could execute inline before the 
method returns.
+   */
+  void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> 
callback);
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
new file mode 100644
index 00000000000..9ed5c9fcf39
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.grpc.v1p60p1.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Internal
+@ThreadSafe
+public class StreamingGlobalConfigHandleImpl implements 
StreamingGlobalConfigHandle {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingGlobalConfigHandleImpl.class);
+
+  private final AtomicReference<StreamingGlobalConfig> streamingEngineConfig =
+      new AtomicReference<>();
+
+  private final CopyOnWriteArrayList<ConfigCallback> configCallbacks = new 
CopyOnWriteArrayList<>();
+
+  @Override
+  @Nonnull
+  public StreamingGlobalConfig getConfig() {
+    Preconditions.checkState(
+        streamingEngineConfig.get() != null,
+        "Global config should be set before any processing is done");
+    return streamingEngineConfig.get();
+  }
+
+  @Override
+  public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> 
callback) {
+    ConfigCallback configCallback = new ConfigCallback(callback);
+    configCallbacks.add(configCallback);
+    if (streamingEngineConfig.get() != null) {
+      configCallback.run();
+    }
+  }
+
+  void setConfig(@Nonnull StreamingGlobalConfig config) {
+    if (config.equals(streamingEngineConfig.get())) {
+      return;
+    }
+    streamingEngineConfig.set(config);
+    for (ConfigCallback configCallback : configCallbacks) {
+      configCallback.run();
+    }
+  }
+
+  private class ConfigCallback {
+
+    private final AtomicInteger queuedOrRunning = new AtomicInteger(0);
+    private final Consumer<StreamingGlobalConfig> configConsumer;
+
+    private ConfigCallback(Consumer<StreamingGlobalConfig> configConsumer) {
+      this.configConsumer = configConsumer;
+    }
+
+    /**
+     * Runs the passed in callback with the latest config. Overlapping `run()` 
calls will be
+     * collapsed into one. If the callback is already running a new call will 
be scheduled to run
+     * after the current execution completes, on the same thread which ran the 
previous run.
+     */
+    private void run() {
+      // If the callback is already running,
+      // Increment queued and return. The thread running
+      // the callback will run it again with the latest config.
+      if (queuedOrRunning.incrementAndGet() > 1) {
+        return;
+      }
+      // Else run the callback
+      while (true) {
+        try {
+          
configConsumer.accept(StreamingGlobalConfigHandleImpl.this.streamingEngineConfig.get());
+        } catch (Exception e) {
+          LOG.error("Exception running GlobalConfig callback", e);
+        }
+        if (queuedOrRunning.updateAndGet(
+                queuedOrRunning -> {
+                  if (queuedOrRunning == 1) {
+                    // If there are no queued requests stop processing.
+                    return 0;
+                  }
+                  // Else, clear queue, set 1 running and run the callback
+                  return 1;
+                })
+            == 0) {
+          break;
+        }
+      }
+    }
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
index d305e25af7e..6981312eff1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusPages.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 import javax.servlet.http.HttpServletRequest;
@@ -38,6 +39,8 @@ import 
org.apache.beam.runners.dataflow.worker.status.DebugCapture;
 import 
org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
 import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationStateCache;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServlet;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
@@ -77,6 +80,8 @@ public final class StreamingWorkerStatusPages {
   private final DebugCapture.@Nullable Manager debugCapture;
   private final @Nullable ChannelzServlet channelzServlet;
 
+  private final AtomicReference<StreamingGlobalConfig> globalConfig = new 
AtomicReference<>();
+
   StreamingWorkerStatusPages(
       Supplier<Instant> clock,
       long clientId,
@@ -90,7 +95,8 @@ public final class StreamingWorkerStatusPages {
       @Nullable GrpcWindmillStreamFactory windmillStreamFactory,
       Consumer<PrintWriter> getDataStatusProvider,
       BoundedQueueExecutor workUnitExecutor,
-      ScheduledExecutorService statusPageDumper) {
+      ScheduledExecutorService statusPageDumper,
+      StreamingGlobalConfigHandle globalConfigHandle) {
     this.clock = clock;
     this.clientId = clientId;
     this.isRunning = isRunning;
@@ -104,6 +110,7 @@ public final class StreamingWorkerStatusPages {
     this.getDataStatusProvider = getDataStatusProvider;
     this.workUnitExecutor = workUnitExecutor;
     this.statusPageDumper = statusPageDumper;
+    globalConfigHandle.registerConfigObserver(globalConfig::set);
   }
 
   public static StreamingWorkerStatusPages.Builder builder() {
@@ -150,6 +157,17 @@ public final class StreamingWorkerStatusPages {
     statusPages.addCapturePage(Preconditions.checkNotNull(channelzServlet));
     statusPages.addStatusDataProvider(
         "streaming", "Streaming RPCs", 
Preconditions.checkNotNull(windmillStreamFactory));
+    statusPages.addStatusDataProvider(
+        "jobSettings",
+        "User Worker Job Settings",
+        writer -> {
+          @Nullable StreamingGlobalConfig config = globalConfig.get();
+          if (config == null) {
+            writer.println("Job Settings not loaded.");
+            return;
+          }
+          writer.println(config.userWorkerJobSettings().toString());
+        });
   }
 
   private boolean isStreamingEngine() {
@@ -256,6 +274,8 @@ public final class StreamingWorkerStatusPages {
 
     Builder setStatusPageDumper(ScheduledExecutorService statusPageDumper);
 
+    Builder setGlobalConfigHandle(StreamingGlobalConfigHandle 
globalConfigHandle);
+
     StreamingWorkerStatusPages build();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index cf2e7260592..412608ea398 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 /** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
 @ThreadSafe
 public class GrpcDispatcherClient {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
   private final CountDownLatch onInitializedEndpoints;
@@ -146,6 +148,14 @@ public class GrpcDispatcherClient {
     return dispatcherStubs.get().hasInitializedEndpoints();
   }
 
+  public void onJobConfig(StreamingGlobalConfig config) {
+    if (config.windmillServiceEndpoints().isEmpty()) {
+      LOG.warn("Dispatcher client received empty windmill service endpoints 
from global config");
+      return;
+    }
+    consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
+  }
+
   public synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
     ImmutableSet<HostAndPort> currentDispatcherEndpoints =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 20c1247b216..d5e0b3a24e2 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -47,6 +47,7 @@ import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import 
org.apache.beam.runners.dataflow.worker.streaming.ComputationWorkExecutor;
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
@@ -94,6 +95,7 @@ final class ComputationWorkExecutorFactory {
 
   private final long maxSinkBytes;
   private final IdGenerator idGenerator;
+  private final StreamingGlobalConfigHandle globalConfigHandle;
   private final boolean throwExceptionOnLargeOutput;
 
   ComputationWorkExecutorFactory(
@@ -103,12 +105,14 @@ final class ComputationWorkExecutorFactory {
       Function<String, WindmillStateCache.ForComputation> stateCacheFactory,
       DataflowExecutionStateSampler sampler,
       CounterSet pendingDeltaCounters,
-      IdGenerator idGenerator) {
+      IdGenerator idGenerator,
+      StreamingGlobalConfigHandle globalConfigHandle) {
     this.options = options;
     this.mapTaskExecutorFactory = mapTaskExecutorFactory;
     this.readerCache = readerCache;
     this.stateCacheFactory = stateCacheFactory;
     this.idGenerator = idGenerator;
+    this.globalConfigHandle = globalConfigHandle;
     this.readerRegistry = ReaderRegistry.defaultRegistry();
     this.sinkRegistry = SinkRegistry.defaultRegistry();
     this.sampler = sampler;
@@ -262,6 +266,7 @@ final class ComputationWorkExecutorFactory {
         stageInfo.metricsContainerRegistry(),
         executionStateTracker,
         stageInfo.executionStateRegistry(),
+        globalConfigHandle,
         maxSinkBytes,
         throwExceptionOnLargeOutput);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index 86f2cffe604..641fd119a42 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -25,7 +25,6 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.annotation.concurrent.ThreadSafe;
@@ -33,7 +32,6 @@ import 
org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
 import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory;
 import org.apache.beam.runners.dataflow.worker.HotKeyLogger;
-import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.ReaderCache;
 import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException;
 import 
org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC;
@@ -44,6 +42,7 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.KeyCommitTooLargeExcept
 import org.apache.beam.runners.dataflow.worker.streaming.StageInfo;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingCounters;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcherFactory;
@@ -85,7 +84,7 @@ public final class StreamingWorkScheduler {
   private final HotKeyLogger hotKeyLogger;
   private final ConcurrentMap<String, StageInfo> stageInfoMap;
   private final DataflowExecutionStateSampler sampler;
-  private final AtomicReference<OperationalLimits> operationalLimits;
+  private final StreamingGlobalConfigHandle globalConfigHandle;
 
   public StreamingWorkScheduler(
       DataflowWorkerHarnessOptions options,
@@ -99,7 +98,7 @@ public final class StreamingWorkScheduler {
       HotKeyLogger hotKeyLogger,
       ConcurrentMap<String, StageInfo> stageInfoMap,
       DataflowExecutionStateSampler sampler,
-      AtomicReference<OperationalLimits> operationalLimits) {
+      StreamingGlobalConfigHandle globalConfigHandle) {
     this.options = options;
     this.clock = clock;
     this.computationWorkExecutorFactory = computationWorkExecutorFactory;
@@ -111,7 +110,7 @@ public final class StreamingWorkScheduler {
     this.hotKeyLogger = hotKeyLogger;
     this.stageInfoMap = stageInfoMap;
     this.sampler = sampler;
-    this.operationalLimits = operationalLimits;
+    this.globalConfigHandle = globalConfigHandle;
   }
 
   public static StreamingWorkScheduler create(
@@ -126,8 +125,8 @@ public final class StreamingWorkScheduler {
       StreamingCounters streamingCounters,
       HotKeyLogger hotKeyLogger,
       DataflowExecutionStateSampler sampler,
-      AtomicReference<OperationalLimits> operationalLimits,
       IdGenerator idGenerator,
+      StreamingGlobalConfigHandle globalConfigHandle,
       ConcurrentMap<String, StageInfo> stageInfoMap) {
     ComputationWorkExecutorFactory computationWorkExecutorFactory =
         new ComputationWorkExecutorFactory(
@@ -137,7 +136,8 @@ public final class StreamingWorkScheduler {
             stateCacheFactory,
             sampler,
             streamingCounters.pendingDeltaCounters(),
-            idGenerator);
+            idGenerator,
+            globalConfigHandle);
 
     return new StreamingWorkScheduler(
         options,
@@ -151,7 +151,7 @@ public final class StreamingWorkScheduler {
         hotKeyLogger,
         stageInfoMap,
         sampler,
-        operationalLimits);
+        globalConfigHandle);
   }
 
   private static long computeShuffleBytesRead(Windmill.WorkItem workItem) {
@@ -295,7 +295,7 @@ public final class StreamingWorkScheduler {
       Windmill.WorkItemCommitRequest commitRequest,
       String computationId,
       Windmill.WorkItem workItem) {
-    long byteLimit = operationalLimits.get().maxWorkItemCommitBytes;
+    long byteLimit = 
globalConfigHandle.getConfig().operationalLimits().getMaxWorkItemCommitBytes();
     int commitSize = commitRequest.getSerializedSize();
     int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize;
 
@@ -380,12 +380,7 @@ public final class StreamingWorkScheduler {
 
       // Blocks while executing work.
       computationWorkExecutor.executeWork(
-          executionKey,
-          work,
-          stateReader,
-          localSideInputStateFetcher,
-          operationalLimits.get(),
-          outputBuilder);
+          executionKey, work, stateReader, localSideInputStateFetcher, 
outputBuilder);
 
       if (work.isFailed()) {
         throw new WorkItemCancelledException(workItem.getShardingKey());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index b41ad391d87..dadf0217123 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -102,6 +102,8 @@ import 
org.apache.beam.runners.dataflow.worker.streaming.ExecutableWork;
 import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl;
 import 
org.apache.beam.runners.dataflow.worker.testing.RestoreDataflowLoggingMDC;
 import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -275,6 +277,8 @@ public class StreamingDataflowWorkerTest {
   @Rule public TestRule restoreMDC = new RestoreDataflowLoggingMDC();
   @Rule public ErrorCollector errorCollector = new ErrorCollector();
   WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class);
+  StreamingGlobalConfigHandleImpl mockGlobalConfigHandle =
+      mock(StreamingGlobalConfigHandleImpl.class);
   HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class);
 
   private @Nullable ComputationStateCache computationStateCache = null;
@@ -750,7 +754,9 @@ public class StreamingDataflowWorkerTest {
     requestBuilder.append("cache_token: ");
     requestBuilder.append(index + 1);
     requestBuilder.append(" ");
-    if (hasSourceBytesProcessed) 
requestBuilder.append("source_bytes_processed: 0 ");
+    if (hasSourceBytesProcessed) {
+      requestBuilder.append("source_bytes_processed: 0 ");
+    }
 
     return requestBuilder;
   }
@@ -834,6 +840,8 @@ public class StreamingDataflowWorkerTest {
 
   private StreamingDataflowWorker makeWorker(
       StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) {
+    when(mockGlobalConfigHandle.getConfig())
+        .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig());
     StreamingDataflowWorker worker =
         StreamingDataflowWorker.forTesting(
             streamingDataflowWorkerTestParams.stateNameMappings(),
@@ -847,8 +855,8 @@ public class StreamingDataflowWorkerTest {
             hotKeyLogger,
             streamingDataflowWorkerTestParams.clock(),
             streamingDataflowWorkerTestParams.executorSupplier(),
-            streamingDataflowWorkerTestParams.localRetryTimeoutMs(),
-            streamingDataflowWorkerTestParams.operationalLimits());
+            mockGlobalConfigHandle,
+            streamingDataflowWorkerTestParams.localRetryTimeoutMs());
     this.computationStateCache = worker.getComputationStateCache();
     return worker;
   }
@@ -1210,8 +1218,11 @@ public class StreamingDataflowWorkerTest {
         makeWorker(
             defaultWorkerParams()
                 .setInstructions(instructions)
-                .setOperationalLimits(
-                    
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
+                .setStreamingGlobalConfig(
+                    StreamingGlobalConfig.builder()
+                        .setOperationalLimits(
+                            
OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build())
+                        .build())
                 .publishCounters()
                 .build());
     worker.start();
@@ -1282,7 +1293,11 @@ public class StreamingDataflowWorkerTest {
         makeWorker(
             
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
                 .setInstructions(instructions)
-                
.setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
+                .setStreamingGlobalConfig(
+                    StreamingGlobalConfig.builder()
+                        .setOperationalLimits(
+                            
OperationalLimits.builder().setMaxOutputKeyBytes(15).build())
+                        .build())
                 .build());
     worker.start();
 
@@ -1315,8 +1330,11 @@ public class StreamingDataflowWorkerTest {
         makeWorker(
             
defaultWorkerParams("--experiments=throw_exceptions_on_large_output")
                 .setInstructions(instructions)
-                .setOperationalLimits(
-                    
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
+                .setStreamingGlobalConfig(
+                    StreamingGlobalConfig.builder()
+                        .setOperationalLimits(
+                            
OperationalLimits.builder().setMaxOutputValueBytes(15).build())
+                        .build())
                 .build());
     worker.start();
 
@@ -4412,7 +4430,9 @@ public class StreamingDataflowWorkerTest {
     }
 
     boolean isActiveWorkRefresh(GetDataRequest request) {
-      if (request.getComputationHeartbeatRequestCount() > 0) return true;
+      if (request.getComputationHeartbeatRequestCount() > 0) {
+        return true;
+      }
       for (ComputationGetDataRequest computationRequest : 
request.getRequestsList()) {
         if 
(!computationRequest.getComputationId().equals(DEFAULT_COMPUTATION_ID)) {
           return false;
@@ -4508,7 +4528,7 @@ public class StreamingDataflowWorkerTest {
           .setLocalRetryTimeoutMs(-1)
           .setPublishCounters(false)
           .setClock(Instant::now)
-          .setOperationalLimits(OperationalLimits.builder().build());
+          .setStreamingGlobalConfig(StreamingGlobalConfig.builder().build());
     }
 
     abstract ImmutableMap<String, String> stateNameMappings();
@@ -4525,10 +4545,11 @@ public class StreamingDataflowWorkerTest {
 
     abstract int localRetryTimeoutMs();
 
-    abstract OperationalLimits operationalLimits();
+    abstract StreamingGlobalConfig streamingGlobalConfig();
 
     @AutoValue.Builder
     abstract static class Builder {
+
       abstract Builder setStateNameMappings(ImmutableMap<String, String> 
value);
 
       abstract ImmutableMap.Builder<String, String> stateNameMappingsBuilder();
@@ -4559,7 +4580,7 @@ public class StreamingDataflowWorkerTest {
 
       abstract Builder setLocalRetryTimeoutMs(int value);
 
-      abstract Builder setOperationalLimits(OperationalLimits 
operationalLimits);
+      abstract Builder setStreamingGlobalConfig(StreamingGlobalConfig config);
 
       abstract StreamingDataflowWorkerTestParams build();
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 86ed8f552d1..a1d4210f3db 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -59,6 +59,9 @@ import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfi
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
@@ -107,6 +110,8 @@ public class StreamingModeExecutionContextTest {
     options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
     CounterSet counterSet = new CounterSet();
     ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
+    StreamingGlobalConfigHandle globalConfigHandle =
+        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
     stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), 
"testStateFamily");
     executionContext =
         new StreamingModeExecutionContext(
@@ -127,6 +132,7 @@ public class StreamingModeExecutionContextTest {
                 PipelineOptionsFactory.create(),
                 "test-work-item-id"),
             executionStateRegistry,
+            globalConfigHandle,
             Long.MAX_VALUE,
             /*throwExceptionOnLargeOutput=*/ false);
   }
@@ -158,7 +164,6 @@ public class StreamingModeExecutionContextTest {
             Watermarks.builder().setInputDataWatermark(new 
Instant(1000)).build()),
         stateReader,
         sideInputStateFetcher,
-        OperationalLimits.builder().build(),
         outputBuilder);
 
     TimerInternals timerInternals = stepContext.timerInternals();
@@ -208,7 +213,6 @@ public class StreamingModeExecutionContextTest {
             Watermarks.builder().setInputDataWatermark(new 
Instant(1000)).build()),
         stateReader,
         sideInputStateFetcher,
-        OperationalLimits.builder().build(),
         outputBuilder);
     TimerInternals timerInternals = stepContext.timerInternals();
     
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index f2e03b453fd..8ad73a5145b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -90,6 +90,9 @@ import 
org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.testing.TestCountingSource;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
@@ -594,6 +597,8 @@ public class WorkerCustomSourcesTest {
     StreamingModeExecutionStateRegistry executionStateRegistry =
         new StreamingModeExecutionStateRegistry();
     ReaderCache readerCache = new ReaderCache(Duration.standardMinutes(1), 
Runnable::run);
+    StreamingGlobalConfigHandle globalConfigHandle =
+        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
     StreamingModeExecutionContext context =
         new StreamingModeExecutionContext(
             counterSet,
@@ -610,6 +615,7 @@ public class WorkerCustomSourcesTest {
                 PipelineOptionsFactory.create(),
                 "test-work-item-id"),
             executionStateRegistry,
+            globalConfigHandle,
             Long.MAX_VALUE,
             /*throwExceptionOnLargeOutput=*/ false);
 
@@ -635,7 +641,6 @@ public class WorkerCustomSourcesTest {
               Watermarks.builder().setInputDataWatermark(new 
Instant(0)).build()),
           mock(WindmillStateReader.class),
           mock(SideInputStateFetcher.class),
-          OperationalLimits.builder().build(),
           Windmill.WorkItemCommitRequest.newBuilder());
 
       @SuppressWarnings({"unchecked", "rawtypes"})
@@ -960,6 +965,8 @@ public class WorkerCustomSourcesTest {
     CounterSet counterSet = new CounterSet();
     StreamingModeExecutionStateRegistry executionStateRegistry =
         new StreamingModeExecutionStateRegistry();
+    StreamingGlobalConfigHandle globalConfigHandle =
+        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
     StreamingModeExecutionContext context =
         new StreamingModeExecutionContext(
             counterSet,
@@ -979,6 +986,7 @@ public class WorkerCustomSourcesTest {
                 PipelineOptionsFactory.create(),
                 "test-work-item-id"),
             executionStateRegistry,
+            globalConfigHandle,
             Long.MAX_VALUE,
             /*throwExceptionOnLargeOutput=*/ false);
 
@@ -1012,7 +1020,6 @@ public class WorkerCustomSourcesTest {
         dummyWork,
         mock(WindmillStateReader.class),
         mock(SideInputStateFetcher.class),
-        OperationalLimits.builder().build(),
         Windmill.WorkItemCommitRequest.newBuilder());
 
     @SuppressWarnings({"unchecked", "rawtypes"})
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
new file mode 100644
index 00000000000..b5cb85a58c1
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/FixedGlobalConfigHandleTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class FixedGlobalConfigHandleTest {
+
+  @Test
+  public void getConfig() {
+    StreamingGlobalConfig config =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    FixedGlobalConfigHandle globalConfigHandle = new 
FixedGlobalConfigHandle(config);
+    assertEquals(config, globalConfigHandle.getConfig());
+  }
+
+  @Test
+  public void registerConfigObserver() throws InterruptedException {
+    StreamingGlobalConfig config =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    FixedGlobalConfigHandle globalConfigHandle = new 
FixedGlobalConfigHandle(config);
+    AtomicReference<StreamingGlobalConfig> configFromCallback = new 
AtomicReference<>();
+    CountDownLatch latch = new CountDownLatch(1);
+    globalConfigHandle.registerConfigObserver(
+        cbConfig -> {
+          configFromCallback.set(cbConfig);
+          latch.countDown();
+        });
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configFromCallback.get(), globalConfigHandle.getConfig());
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
index f39c98c61b1..2586ae2be86 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingApplianceComputationConfigFetcherTest.java
@@ -137,6 +137,8 @@ public class StreamingApplianceComputationConfigFetcherTest 
{
   }
 
   private StreamingApplianceComputationConfigFetcher 
createStreamingApplianceConfigLoader() {
-    return new 
StreamingApplianceComputationConfigFetcher(mockWindmillServer::getConfig);
+    return new StreamingApplianceComputationConfigFetcher(
+        mockWindmillServer::getConfig,
+        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 59fd092adcb..9fa17588c94 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -34,7 +34,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
-import java.util.function.Consumer;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
 import org.apache.beam.runners.dataflow.worker.WorkUnitClient;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -47,6 +47,7 @@ import org.mockito.internal.stubbing.answers.Returns;
 
 @RunWith(JUnit4.class)
 public class StreamingEngineComputationConfigFetcherTest {
+
   private final WorkUnitClient mockDataflowServiceClient =
       mock(WorkUnitClient.class, new Returns(Optional.empty()));
   private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
@@ -54,13 +55,13 @@ public class StreamingEngineComputationConfigFetcherTest {
   private StreamingEngineComputationConfigFetcher createConfigFetcher(
       boolean waitForInitialConfig,
       long globalConfigRefreshPeriod,
-      Consumer<StreamingEnginePipelineConfig> onPipelineConfig) {
+      StreamingGlobalConfigHandleImpl globalConfigHandle) {
     return StreamingEngineComputationConfigFetcher.forTesting(
         !waitForInitialConfig,
         globalConfigRefreshPeriod,
         mockDataflowServiceClient,
-        ignored -> Executors.newSingleThreadScheduledExecutor(),
-        onPipelineConfig);
+        globalConfigHandle,
+        ignored -> Executors.newSingleThreadScheduledExecutor());
   }
 
   @After
@@ -75,31 +76,33 @@ public class StreamingEngineComputationConfigFetcherTest {
             .setJobId("job")
             .setStreamingConfigTask(new 
StreamingConfigTask().setMaxWorkItemCommitBytes(10L));
     CountDownLatch waitForInitialConfig = new CountDownLatch(1);
-    Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new 
HashSet<>();
+    Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
     when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
         .thenReturn(Optional.of(initialConfig));
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          try {
+            receivedPipelineConfig.add(config);
+            waitForInitialConfig.await();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        });
     streamingEngineConfigFetcher =
-        createConfigFetcher(
-            /* waitForInitialConfig= */ true,
-            0,
-            config -> {
-              try {
-                receivedPipelineConfig.add(config);
-                waitForInitialConfig.await();
-              } catch (InterruptedException e) {
-                throw new RuntimeException(e);
-              }
-            });
+        createConfigFetcher(/* waitForInitialConfig= */ true, 0, 
globalConfigHandle);
     Thread asyncStartConfigLoader = new 
Thread(streamingEngineConfigFetcher::start);
     asyncStartConfigLoader.start();
     waitForInitialConfig.countDown();
     asyncStartConfigLoader.join();
-    assertThat(receivedPipelineConfig)
-        .containsExactly(
-            StreamingEnginePipelineConfig.builder()
-                .setMaxWorkItemCommitBytes(
-                    
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
-                .build());
+    StreamingGlobalConfig.Builder configBuilder =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxWorkItemCommitBytes(
+                        
initialConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+                    .build());
+    assertThat(receivedPipelineConfig).containsExactly(configBuilder.build());
   }
 
   @Test
@@ -117,7 +120,7 @@ public class StreamingEngineComputationConfigFetcherTest {
             .setJobId("job")
             .setStreamingConfigTask(new 
StreamingConfigTask().setMaxWorkItemCommitBytes(100L));
     CountDownLatch numExpectedRefreshes = new CountDownLatch(3);
-    Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new 
HashSet<>();
+    Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
     when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
         .thenReturn(Optional.of(firstConfig))
         .thenReturn(Optional.of(secondConfig))
@@ -127,15 +130,15 @@ public class StreamingEngineComputationConfigFetcherTest {
         // ConfigFetcher should not do anything with a config that doesn't 
contain a
         // StreamingConfigTask.
         .thenReturn(Optional.of(new WorkItem().setJobId("jobId")));
-
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          receivedPipelineConfig.add(config);
+          numExpectedRefreshes.countDown();
+        });
     streamingEngineConfigFetcher =
         createConfigFetcher(
-            /* waitForInitialConfig= */ true,
-            Duration.millis(100).getMillis(),
-            config -> {
-              receivedPipelineConfig.add(config);
-              numExpectedRefreshes.countDown();
-            });
+            /* waitForInitialConfig= */ true, 
Duration.millis(100).getMillis(), globalConfigHandle);
 
     Thread asyncStartConfigLoader = new 
Thread(streamingEngineConfigFetcher::start);
     asyncStartConfigLoader.start();
@@ -143,24 +146,34 @@ public class StreamingEngineComputationConfigFetcherTest {
     asyncStartConfigLoader.join();
     assertThat(receivedPipelineConfig)
         .containsExactly(
-            StreamingEnginePipelineConfig.builder()
-                .setMaxWorkItemCommitBytes(
-                    
firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+            StreamingGlobalConfig.builder()
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxWorkItemCommitBytes(
+                            
firstConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+                        .build())
                 .build(),
-            StreamingEnginePipelineConfig.builder()
-                .setMaxWorkItemCommitBytes(
-                    
secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+            StreamingGlobalConfig.builder()
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxWorkItemCommitBytes(
+                            
secondConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+                        .build())
                 .build(),
-            StreamingEnginePipelineConfig.builder()
-                .setMaxWorkItemCommitBytes(
-                    
thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+            StreamingGlobalConfig.builder()
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxWorkItemCommitBytes(
+                            
thirdConfig.getStreamingConfigTask().getMaxWorkItemCommitBytes())
+                        .build())
                 .build());
   }
 
   @Test
   public void testGetComputationConfig() throws IOException {
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
     streamingEngineConfigFetcher =
-        createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored -> 
{});
+        createConfigFetcher(/* waitForInitialConfig= */ false, 0, 
globalConfigHandle);
     String computationId = "computationId";
     String stageName = "stageName";
     String systemName = "systemName";
@@ -193,9 +206,11 @@ public class StreamingEngineComputationConfigFetcherTest {
 
   @Test
   public void testGetComputationConfig_noComputationPresent() throws 
IOException {
-    Set<StreamingEnginePipelineConfig> receivedPipelineConfig = new 
HashSet<>();
+    Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    globalConfigHandle.registerConfigObserver(receivedPipelineConfig::add);
     streamingEngineConfigFetcher =
-        createConfigFetcher(/* waitForInitialConfig= */ false, 0, 
receivedPipelineConfig::add);
+        createConfigFetcher(/* waitForInitialConfig= */ false, 0, 
globalConfigHandle);
     when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString()))
         .thenReturn(Optional.empty());
     Optional<ComputationConfig> pipelineConfig =
@@ -206,8 +221,9 @@ public class StreamingEngineComputationConfigFetcherTest {
 
   @Test
   public void testGetComputationConfig_fetchConfigFromDataflowError() throws 
IOException {
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
     streamingEngineConfigFetcher =
-        createConfigFetcher(/* waitForInitialConfig= */ false, 0, ignored -> 
{});
+        createConfigFetcher(/* waitForInitialConfig= */ false, 0, 
globalConfigHandle);
     RuntimeException e = new RuntimeException("something bad happened.");
     
when(mockDataflowServiceClient.getStreamingConfigWorkItem(anyString())).thenThrow(e);
     Throwable fetchConfigError =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
new file mode 100644
index 00000000000..059f60731a7
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfigHandleImplTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import org.apache.beam.runners.dataflow.worker.OperationalLimits;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingGlobalConfigHandleImplTest {
+
+  @Test
+  public void getConfig() {
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig config =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    globalConfigHandle.setConfig(config);
+    assertEquals(config, globalConfigHandle.getConfig());
+
+    StreamingGlobalConfig updatedConfig =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(324)
+                    .setMaxOutputKeyBytes(456)
+                    .setMaxWorkItemCommitBytes(123)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost1")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(true)
+                    .build())
+            .build();
+    globalConfigHandle.setConfig(updatedConfig);
+    assertEquals(updatedConfig, globalConfigHandle.getConfig());
+  }
+
+  @Test
+  public void registerConfigObserver_configSetAfterRegisteringCallback()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig configToSet =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    AtomicReference<StreamingGlobalConfig> configFromCallback1 = new 
AtomicReference<>();
+    AtomicReference<StreamingGlobalConfig> configFromCallback2 = new 
AtomicReference<>();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback1.set(config);
+          latch.countDown();
+        });
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback2.set(config);
+          latch.countDown();
+        });
+    globalConfigHandle.setConfig(configToSet);
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig());
+    assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig());
+  }
+
+  @Test
+  public void registerConfigObserver_configSetBeforeRegisteringCallback()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig configToSet =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    AtomicReference<StreamingGlobalConfig> configFromCallback1 = new 
AtomicReference<>();
+    AtomicReference<StreamingGlobalConfig> configFromCallback2 = new 
AtomicReference<>();
+    globalConfigHandle.setConfig(configToSet);
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback1.set(config);
+          latch.countDown();
+        });
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback2.set(config);
+          latch.countDown();
+        });
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configFromCallback1.get(), globalConfigHandle.getConfig());
+    assertEquals(configFromCallback2.get(), globalConfigHandle.getConfig());
+  }
+
+  @Test
+  public void 
registerConfigObserver_configSetBeforeRegisteringCallback_callbackThrowsException()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig configToSet =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    AtomicReference<StreamingGlobalConfig> configFromCallback = new 
AtomicReference<>();
+    globalConfigHandle.setConfig(configToSet);
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          latch.countDown();
+          throw new RuntimeException();
+        });
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback.set(config);
+          latch.countDown();
+        });
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configFromCallback.get(), configToSet);
+  }
+
+  @Test
+  public void 
registerConfigObserver_configSetAfterRegisteringCallback_callbackThrowsException()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig configToSet =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    AtomicReference<StreamingGlobalConfig> configFromCallback = new 
AtomicReference<>();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          latch.countDown();
+          throw new RuntimeException();
+        });
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configFromCallback.set(config);
+          latch.countDown();
+        });
+    globalConfigHandle.setConfig(configToSet);
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configFromCallback.get(), configToSet);
+  }
+
+  @Test
+  public void 
registerConfigObserver_shouldNotCallCallbackForIfConfigRemainsSame()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicInteger callbackCount = new AtomicInteger(0);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    Supplier<StreamingGlobalConfig> configToSet =
+        () ->
+            StreamingGlobalConfig.builder()
+                .setOperationalLimits(
+                    OperationalLimits.builder()
+                        .setMaxOutputValueBytes(123)
+                        .setMaxOutputKeyBytes(324)
+                        .setMaxWorkItemCommitBytes(456)
+                        .build())
+                
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+                .setUserWorkerJobSettings(
+                    UserWorkerRunnerV1Settings.newBuilder()
+                        .setUseSeparateWindmillHeartbeatStreams(false)
+                        .build())
+                .build();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          callbackCount.incrementAndGet();
+          latch.countDown();
+        });
+    globalConfigHandle.setConfig(configToSet.get());
+    // call setter again with same config
+    globalConfigHandle.setConfig(configToSet.get());
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+    assertEquals(1, callbackCount.get());
+  }
+
+  @Test
+  public void registerConfigObserver_updateConfigWhenCallbackIsRunning()
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(2);
+    StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+    StreamingGlobalConfig initialConfig =
+        StreamingGlobalConfig.builder()
+            
.setOperationalLimits(OperationalLimits.builder().setMaxOutputValueBytes(4569).build())
+            .build();
+    StreamingGlobalConfig updatedConfig =
+        StreamingGlobalConfig.builder()
+            .setOperationalLimits(
+                OperationalLimits.builder()
+                    .setMaxOutputValueBytes(123)
+                    .setMaxOutputKeyBytes(324)
+                    .setMaxWorkItemCommitBytes(456)
+                    .build())
+            
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromHost("windmillHost")))
+            .setUserWorkerJobSettings(
+                UserWorkerRunnerV1Settings.newBuilder()
+                    .setUseSeparateWindmillHeartbeatStreams(false)
+                    .build())
+            .build();
+    CopyOnWriteArrayList<StreamingGlobalConfig> configsFromCallback = new 
CopyOnWriteArrayList<>();
+    globalConfigHandle.registerConfigObserver(
+        config -> {
+          configsFromCallback.add(config);
+          if (config.equals(initialConfig)) {
+            globalConfigHandle.setConfig(updatedConfig);
+          }
+          latch.countDown();
+        });
+    globalConfigHandle.setConfig(initialConfig);
+    assertTrue(latch.await(10, TimeUnit.SECONDS));
+    assertEquals(configsFromCallback.get(0), initialConfig);
+    assertEquals(configsFromCallback.get(1), updatedConfig);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 4677ff9dcc9..3b3348dbc3f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -923,6 +923,15 @@ message WorkerMetadataResponse {
   reserved 4;
 }
 
+// Settings to control runtime behavior of the java runner v1 user worker.
+message UserWorkerRunnerV1Settings {
+  // If true, use separate channels for each windmill RPC.
+  optional bool use_windmill_isolated_channels = 1 [default = true];
+
+  // If true, use separate streaming RPC for windmill heartbeats and state 
reads.
+  optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
+}
+
 service WindmillAppliance {
   // Gets streaming Dataflow work.
   rpc GetWork(.windmill.GetWorkRequest) returns (.windmill.GetWorkResponse);

Reply via email to