This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1531743ce8d [Dataflow Streaming] Enable state tag encoding v2 based on
backend flag (#37464)
1531743ce8d is described below
commit 1531743ce8dca4da4ec15f4671df9a3378e32fb8
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Feb 4 01:45:56 2026 -0800
[Dataflow Streaming] Enable state tag encoding v2 based on backend flag
(#37464)
---
.../worker/StreamingModeExecutionContext.java | 17 +++++-----
.../StreamingEngineComputationConfigFetcher.java | 10 ++++++
.../streaming/config/StreamingGlobalConfig.java | 7 +++-
.../processing/ComputationWorkExecutorFactory.java | 14 +-------
.../worker/StreamingModeExecutionContextTest.java | 35 ++++++++++++++++----
.../dataflow/worker/WorkerCustomSourcesTest.java | 6 ++--
...treamingEngineComputationConfigFetcherTest.java | 38 ++++++++++++++++++++++
7 files changed, 95 insertions(+), 32 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 09afcadc300..2c936f88e28 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -50,6 +50,7 @@ import
org.apache.beam.runners.dataflow.worker.counters.NameContext;
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
@@ -122,7 +123,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
*/
private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>>
sideInputCache;
- private final WindmillTagEncoding windmillTagEncoding;
+ private WindmillTagEncoding windmillTagEncoding;
/**
* The current user-facing key for this execution context.
*
@@ -162,8 +163,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
- boolean throwExceptionOnLargeOutput,
- boolean enableWindmillTagEncodingV2) {
+ boolean throwExceptionOnLargeOutput) {
super(
counterFactory,
metricsContainerRegistry,
@@ -174,10 +174,6 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
- this.windmillTagEncoding =
- enableWindmillTagEncodingV2
- ? WindmillTagEncodingV2.instance()
- : WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
@@ -244,8 +240,13 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
this.work = work;
this.computationKey = WindmillComputationKey.create(computationId,
work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
+ StreamingGlobalConfig config = globalConfigHandle.getConfig();
// Snapshot the limits for entire bundle processing.
- this.operationalLimits =
globalConfigHandle.getConfig().operationalLimits();
+ this.operationalLimits = config.operationalLimits();
+ this.windmillTagEncoding =
+ config.enableStateTagEncodingV2()
+ ? WindmillTagEncodingV2.instance()
+ : WindmillTagEncodingV1.instance();
this.outputBuilder = outputBuilder;
this.sideInputCache.clear();
clearSinkFullHint();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index 73f2afc0422..0f50c04d1f0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -25,6 +25,7 @@ import
com.google.api.services.dataflow.model.StreamingComputationConfig;
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.IOException;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
@Internal
@ThreadSafe
public final class StreamingEngineComputationConfigFetcher implements
ComputationConfig.Fetcher {
+
private static final Logger LOG =
LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
private static final String CONFIG_REFRESHER_THREAD_NAME =
"GlobalPipelineConfigRefresher";
@@ -209,6 +211,14 @@ public final class StreamingEngineComputationConfigFetcher
implements Computatio
pipelineConfig.setUserWorkerJobSettings(settings);
}
+ Integer tagEncodingVersion =
config.getStreamingEngineStateTagEncodingVersion();
+ if (tagEncodingVersion != null) {
+ Preconditions.checkState(tagEncodingVersion <= 2);
+ }
+ if (Objects.equals(2, tagEncodingVersion)) {
+ pipelineConfig.setEnableStateTagEncodingV2(true);
+ }
+
return pipelineConfig.build();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
index 8f76f5ec27a..c1347e338c7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
@@ -33,11 +33,14 @@ public abstract class StreamingGlobalConfig {
return new AutoValue_StreamingGlobalConfig.Builder()
.setWindmillServiceEndpoints(ImmutableSet.of())
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
- .setOperationalLimits(OperationalLimits.builder().build());
+ .setOperationalLimits(OperationalLimits.builder().build())
+ .setEnableStateTagEncodingV2(false);
}
public abstract OperationalLimits operationalLimits();
+ public abstract boolean enableStateTagEncodingV2();
+
public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
public abstract UserWorkerRunnerV1Settings userWorkerJobSettings();
@@ -51,6 +54,8 @@ public abstract class StreamingGlobalConfig {
public abstract Builder
setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);
+ public abstract Builder setEnableStateTagEncodingV2(boolean enable);
+
public abstract StreamingGlobalConfig build();
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 097da87fb01..26979990330 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -74,14 +74,6 @@ final class ComputationWorkExecutorFactory {
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
"throw_exceptions_on_large_output";
- // Experiment to enable tag encoding v2.
- // Experiment is for testing by dataflow runner developers.
- // Related logic could change anytime without notice.
- // **DO NOT USE** on real workloads.
- // Enabling the experiment could lead to state incompatibilities and broken
jobs.
- private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT =
- "unstable_windmill_tag_encoding_v2";
-
private final DataflowWorkerHarnessOptions options;
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
private final ReaderCache readerCache;
@@ -105,7 +97,6 @@ final class ComputationWorkExecutorFactory {
private final IdGenerator idGenerator;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
- private final boolean enableWindmillTagEncodingV2;
ComputationWorkExecutorFactory(
DataflowWorkerHarnessOptions options,
@@ -133,8 +124,6 @@ final class ComputationWorkExecutorFactory {
: StreamingDataflowWorker.MAX_SINK_BYTES;
this.throwExceptionOnLargeOutput =
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
- this.enableWindmillTagEncodingV2 =
- hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT);
}
private static Nodes.ParallelInstructionNode extractReadNode(
@@ -279,8 +268,7 @@ final class ComputationWorkExecutorFactory {
stageInfo.executionStateRegistry(),
globalConfigHandle,
maxSinkBytes,
- throwExceptionOnLargeOutput,
- enableWindmillTagEncodingV2);
+ throwExceptionOnLargeOutput);
}
private DataflowMapTaskExecutor createMapTaskExecutor(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 8372b33d81c..449d1cc27cc 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -22,6 +22,7 @@ import static
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUp
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -58,14 +59,15 @@ import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfi
import
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
import org.apache.beam.runners.dataflow.worker.streaming.Work;
-import
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
-import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -92,6 +94,7 @@ import org.mockito.MockitoAnnotations;
/** Tests for {@link StreamingModeExecutionContext}. */
@RunWith(JUnit4.class)
public class StreamingModeExecutionContextTest {
+
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
@Mock private SideInputStateFetcher sideInputStateFetcher;
@Mock private WindmillStateReader stateReader;
@@ -102,6 +105,7 @@ public class StreamingModeExecutionContextTest {
new StreamingModeExecutionStateRegistry();
private StreamingModeExecutionContext executionContext;
DataflowWorkerHarnessOptions options;
+ private FakeGlobalConfigHandle globalConfigHandle;
@Before
public void setUp() {
@@ -109,8 +113,7 @@ public class StreamingModeExecutionContextTest {
options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
CounterSet counterSet = new CounterSet();
ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
- StreamingGlobalConfigHandle globalConfigHandle =
- new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
+ globalConfigHandle = new
FakeGlobalConfigHandle(StreamingGlobalConfig.builder().build());
stateNameMap.put(NameContextsForTests.nameContextForTest().userName(),
"testStateFamily");
executionContext =
new StreamingModeExecutionContext(
@@ -133,8 +136,7 @@ public class StreamingModeExecutionContextTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false,
- /*enableWindmillTagEncodingV2=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false);
}
private static Work createMockWork(Windmill.WorkItem workItem, Watermarks
watermarks) {
@@ -406,4 +408,25 @@ public class StreamingModeExecutionContextTest {
sampler.stop();
}
}
+
+ @Test
+ public void testStateTagEncodingBasedOnConfig() {
+ for (Boolean isV2Encoding : Lists.newArrayList(Boolean.TRUE,
Boolean.FALSE)) {
+ Class<?> expectedEncoding =
+ isV2Encoding ? WindmillTagEncodingV2.class :
WindmillTagEncodingV1.class;
+ Windmill.WorkItemCommitRequest.Builder outputBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ globalConfigHandle.setConfig(
+
StreamingGlobalConfig.builder().setEnableStateTagEncodingV2(isV2Encoding).build());
+ executionContext.start(
+ "key",
+ createMockWork(
+
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(),
+ Watermarks.builder().setInputDataWatermark(new
Instant(1000)).build()),
+ stateReader,
+ sideInputStateFetcher,
+ outputBuilder);
+ assertEquals(expectedEncoding,
executionContext.getWindmillTagEncoding().getClass());
+ }
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index f7364104f5d..334b9414b26 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -618,8 +618,7 @@ public class WorkerCustomSourcesTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false,
- /*enableWindmillTagEncodingV2=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false);
options.setNumWorkers(5);
int maxElements = 10;
@@ -990,8 +989,7 @@ public class WorkerCustomSourcesTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false,
- /*enableWindmillTagEncodingV2=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false);
options.setNumWorkers(5);
int maxElements = 100;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 3a0ae7bb208..8b181a7abda 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.dataflow.worker.streaming.config;
import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -29,7 +30,9 @@ import
com.google.api.services.dataflow.model.StreamingComputationConfig;
import com.google.api.services.dataflow.model.StreamingConfigTask;
import com.google.api.services.dataflow.model.WorkItem;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -47,6 +50,7 @@ import org.mockito.internal.stubbing.answers.Returns;
@RunWith(JUnit4.class)
public class StreamingEngineComputationConfigFetcherTest {
+
private final WorkUnitClient mockDataflowServiceClient =
mock(WorkUnitClient.class, new Returns(Optional.empty()));
private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
@@ -231,4 +235,38 @@ public class StreamingEngineComputationConfigFetcherTest {
() ->
streamingEngineConfigFetcher.fetchConfig("someComputationId"));
assertThat(fetchConfigError).isSameInstanceAs(e);
}
+
+ @Test
+ public void test_streamingEngineStateTagEncodingVersion()
+ throws IOException, InterruptedException {
+ for (Optional<Integer> tagEncoding :
+ Arrays.<Optional<Integer>>asList(Optional.empty(), Optional.of(1),
Optional.of(2))) {
+ StreamingConfigTask streamingConfigTask =
+ new StreamingConfigTask().setMaxWorkItemCommitBytes(100L);
+ tagEncoding.ifPresent(
+ version ->
streamingConfigTask.setStreamingEngineStateTagEncodingVersion(version));
+ WorkItem initialConfig =
+ new
WorkItem().setJobId("job").setStreamingConfigTask(streamingConfigTask);
+ CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+ Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
+ when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+ .thenReturn(Optional.of(initialConfig));
+ StreamingGlobalConfigHandleImpl globalConfigHandle = new
StreamingGlobalConfigHandleImpl();
+ globalConfigHandle.registerConfigObserver(
+ config -> {
+ receivedPipelineConfig.add(config);
+ waitForInitialConfig.countDown();
+ });
+ streamingEngineConfigFetcher =
+ createConfigFetcher(/* waitForInitialConfig= */ true, 0,
globalConfigHandle);
+ Thread asyncStartConfigLoader = new
Thread(streamingEngineConfigFetcher::start);
+ asyncStartConfigLoader.start();
+ waitForInitialConfig.await();
+ asyncStartConfigLoader.join();
+ assertEquals(1, receivedPipelineConfig.size());
+ assertEquals(
+ Objects.equals(2,
streamingConfigTask.getStreamingEngineStateTagEncodingVersion()),
+ receivedPipelineConfig.iterator().next().enableStateTagEncodingV2());
+ }
+ }
}