This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1531743ce8d [Dataflow Streaming] Enable state tag encoding v2 based on 
backend flag (#37464)
1531743ce8d is described below

commit 1531743ce8dca4da4ec15f4671df9a3378e32fb8
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Feb 4 01:45:56 2026 -0800

    [Dataflow Streaming] Enable state tag encoding v2 based on backend flag 
(#37464)
---
 .../worker/StreamingModeExecutionContext.java      | 17 +++++-----
 .../StreamingEngineComputationConfigFetcher.java   | 10 ++++++
 .../streaming/config/StreamingGlobalConfig.java    |  7 +++-
 .../processing/ComputationWorkExecutorFactory.java | 14 +-------
 .../worker/StreamingModeExecutionContextTest.java  | 35 ++++++++++++++++----
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  6 ++--
 ...treamingEngineComputationConfigFetcherTest.java | 38 ++++++++++++++++++++++
 7 files changed, 95 insertions(+), 32 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 09afcadc300..2c936f88e28 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -50,6 +50,7 @@ import 
org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
@@ -122,7 +123,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
    */
   private final Map<TupleTag<?>, Map<BoundedWindow, SideInput<?>>> 
sideInputCache;
 
-  private final WindmillTagEncoding windmillTagEncoding;
+  private WindmillTagEncoding windmillTagEncoding;
   /**
    * The current user-facing key for this execution context.
    *
@@ -162,8 +163,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       StreamingModeExecutionStateRegistry executionStateRegistry,
       StreamingGlobalConfigHandle globalConfigHandle,
       long sinkByteLimit,
-      boolean throwExceptionOnLargeOutput,
-      boolean enableWindmillTagEncodingV2) {
+      boolean throwExceptionOnLargeOutput) {
     super(
         counterFactory,
         metricsContainerRegistry,
@@ -174,10 +174,6 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     this.readerCache = readerCache;
     this.globalConfigHandle = globalConfigHandle;
     this.sideInputCache = new HashMap<>();
-    this.windmillTagEncoding =
-        enableWindmillTagEncodingV2
-            ? WindmillTagEncodingV2.instance()
-            : WindmillTagEncodingV1.instance();
     this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
     this.stateCache = stateCache;
     this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
@@ -244,8 +240,13 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     this.work = work;
     this.computationKey = WindmillComputationKey.create(computationId, 
work.getShardedKey());
     this.sideInputStateFetcher = sideInputStateFetcher;
+    StreamingGlobalConfig config = globalConfigHandle.getConfig();
     // Snapshot the limits for entire bundle processing.
-    this.operationalLimits = 
globalConfigHandle.getConfig().operationalLimits();
+    this.operationalLimits = config.operationalLimits();
+    this.windmillTagEncoding =
+        config.enableStateTagEncodingV2()
+            ? WindmillTagEncodingV2.instance()
+            : WindmillTagEncodingV1.instance();
     this.outputBuilder = outputBuilder;
     this.sideInputCache.clear();
     clearSinkFullHint();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
index 73f2afc0422..0f50c04d1f0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java
@@ -25,6 +25,7 @@ import 
com.google.api.services.dataflow.model.StreamingComputationConfig;
 import com.google.api.services.dataflow.model.StreamingConfigTask;
 import com.google.api.services.dataflow.model.WorkItem;
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -61,6 +62,7 @@ import org.slf4j.LoggerFactory;
 @Internal
 @ThreadSafe
 public final class StreamingEngineComputationConfigFetcher implements 
ComputationConfig.Fetcher {
+
   private static final Logger LOG =
       LoggerFactory.getLogger(StreamingEngineComputationConfigFetcher.class);
   private static final String CONFIG_REFRESHER_THREAD_NAME = 
"GlobalPipelineConfigRefresher";
@@ -209,6 +211,14 @@ public final class StreamingEngineComputationConfigFetcher 
implements Computatio
       pipelineConfig.setUserWorkerJobSettings(settings);
     }
 
+    Integer tagEncodingVersion = 
config.getStreamingEngineStateTagEncodingVersion();
+    if (tagEncodingVersion != null) {
+      Preconditions.checkState(tagEncodingVersion <= 2);
+    }
+    if (Objects.equals(2, tagEncodingVersion)) {
+      pipelineConfig.setEnableStateTagEncodingV2(true);
+    }
+
     return pipelineConfig.build();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
index 8f76f5ec27a..c1347e338c7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingGlobalConfig.java
@@ -33,11 +33,14 @@ public abstract class StreamingGlobalConfig {
     return new AutoValue_StreamingGlobalConfig.Builder()
         .setWindmillServiceEndpoints(ImmutableSet.of())
         
.setUserWorkerJobSettings(UserWorkerRunnerV1Settings.newBuilder().build())
-        .setOperationalLimits(OperationalLimits.builder().build());
+        .setOperationalLimits(OperationalLimits.builder().build())
+        .setEnableStateTagEncodingV2(false);
   }
 
   public abstract OperationalLimits operationalLimits();
 
+  public abstract boolean enableStateTagEncodingV2();
+
   public abstract ImmutableSet<HostAndPort> windmillServiceEndpoints();
 
   public abstract UserWorkerRunnerV1Settings userWorkerJobSettings();
@@ -51,6 +54,8 @@ public abstract class StreamingGlobalConfig {
 
     public abstract Builder 
setUserWorkerJobSettings(UserWorkerRunnerV1Settings settings);
 
+    public abstract Builder setEnableStateTagEncodingV2(boolean enable);
+
     public abstract StreamingGlobalConfig build();
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 097da87fb01..26979990330 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -74,14 +74,6 @@ final class ComputationWorkExecutorFactory {
   private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
       "throw_exceptions_on_large_output";
 
-  // Experiment to enable tag encoding v2.
-  // Experiment is for testing by dataflow runner developers.
-  // Related logic could change anytime without notice.
-  // **DO NOT USE** on real workloads.
-  // Enabling the experiment could lead to state incompatibilities and broken 
jobs.
-  private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT =
-      "unstable_windmill_tag_encoding_v2";
-
   private final DataflowWorkerHarnessOptions options;
   private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
   private final ReaderCache readerCache;
@@ -105,7 +97,6 @@ final class ComputationWorkExecutorFactory {
   private final IdGenerator idGenerator;
   private final StreamingGlobalConfigHandle globalConfigHandle;
   private final boolean throwExceptionOnLargeOutput;
-  private final boolean enableWindmillTagEncodingV2;
 
   ComputationWorkExecutorFactory(
       DataflowWorkerHarnessOptions options,
@@ -133,8 +124,6 @@ final class ComputationWorkExecutorFactory {
             : StreamingDataflowWorker.MAX_SINK_BYTES;
     this.throwExceptionOnLargeOutput =
         hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
-    this.enableWindmillTagEncodingV2 =
-        hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT);
   }
 
   private static Nodes.ParallelInstructionNode extractReadNode(
@@ -279,8 +268,7 @@ final class ComputationWorkExecutorFactory {
         stageInfo.executionStateRegistry(),
         globalConfigHandle,
         maxSinkBytes,
-        throwExceptionOnLargeOutput,
-        enableWindmillTagEncodingV2);
+        throwExceptionOnLargeOutput);
   }
 
   private DataflowMapTaskExecutor createMapTaskExecutor(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 8372b33d81c..449d1cc27cc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUp
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -58,14 +59,15 @@ import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfi
 import 
org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
-import 
org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
-import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -92,6 +94,7 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link StreamingModeExecutionContext}. */
 @RunWith(JUnit4.class)
 public class StreamingModeExecutionContextTest {
+
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   @Mock private SideInputStateFetcher sideInputStateFetcher;
   @Mock private WindmillStateReader stateReader;
@@ -102,6 +105,7 @@ public class StreamingModeExecutionContextTest {
       new StreamingModeExecutionStateRegistry();
   private StreamingModeExecutionContext executionContext;
   DataflowWorkerHarnessOptions options;
+  private FakeGlobalConfigHandle globalConfigHandle;
 
   @Before
   public void setUp() {
@@ -109,8 +113,7 @@ public class StreamingModeExecutionContextTest {
     options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
     CounterSet counterSet = new CounterSet();
     ConcurrentHashMap<String, String> stateNameMap = new ConcurrentHashMap<>();
-    StreamingGlobalConfigHandle globalConfigHandle =
-        new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build());
+    globalConfigHandle = new 
FakeGlobalConfigHandle(StreamingGlobalConfig.builder().build());
     stateNameMap.put(NameContextsForTests.nameContextForTest().userName(), 
"testStateFamily");
     executionContext =
         new StreamingModeExecutionContext(
@@ -133,8 +136,7 @@ public class StreamingModeExecutionContextTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false,
-            /*enableWindmillTagEncodingV2=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false);
   }
 
   private static Work createMockWork(Windmill.WorkItem workItem, Watermarks 
watermarks) {
@@ -406,4 +408,25 @@ public class StreamingModeExecutionContextTest {
       sampler.stop();
     }
   }
+
+  @Test
+  public void testStateTagEncodingBasedOnConfig() {
+    for (Boolean isV2Encoding : Lists.newArrayList(Boolean.TRUE, 
Boolean.FALSE)) {
+      Class<?> expectedEncoding =
+          isV2Encoding ? WindmillTagEncodingV2.class : 
WindmillTagEncodingV1.class;
+      Windmill.WorkItemCommitRequest.Builder outputBuilder =
+          Windmill.WorkItemCommitRequest.newBuilder();
+      globalConfigHandle.setConfig(
+          
StreamingGlobalConfig.builder().setEnableStateTagEncodingV2(isV2Encoding).build());
+      executionContext.start(
+          "key",
+          createMockWork(
+              
Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(17L).build(),
+              Watermarks.builder().setInputDataWatermark(new 
Instant(1000)).build()),
+          stateReader,
+          sideInputStateFetcher,
+          outputBuilder);
+      assertEquals(expectedEncoding, 
executionContext.getWindmillTagEncoding().getClass());
+    }
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index f7364104f5d..334b9414b26 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -618,8 +618,7 @@ public class WorkerCustomSourcesTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false,
-            /*enableWindmillTagEncodingV2=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false);
 
     options.setNumWorkers(5);
     int maxElements = 10;
@@ -990,8 +989,7 @@ public class WorkerCustomSourcesTest {
             executionStateRegistry,
             globalConfigHandle,
             Long.MAX_VALUE,
-            /*throwExceptionOnLargeOutput=*/ false,
-            /*enableWindmillTagEncodingV2=*/ false);
+            /*throwExceptionOnLargeOutput=*/ false);
 
     options.setNumWorkers(5);
     int maxElements = 100;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
index 3a0ae7bb208..8b181a7abda 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcherTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow.worker.streaming.config;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
@@ -29,7 +30,9 @@ import 
com.google.api.services.dataflow.model.StreamingComputationConfig;
 import com.google.api.services.dataflow.model.StreamingConfigTask;
 import com.google.api.services.dataflow.model.WorkItem;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -47,6 +50,7 @@ import org.mockito.internal.stubbing.answers.Returns;
 
 @RunWith(JUnit4.class)
 public class StreamingEngineComputationConfigFetcherTest {
+
   private final WorkUnitClient mockDataflowServiceClient =
       mock(WorkUnitClient.class, new Returns(Optional.empty()));
   private StreamingEngineComputationConfigFetcher streamingEngineConfigFetcher;
@@ -231,4 +235,38 @@ public class StreamingEngineComputationConfigFetcherTest {
             () -> 
streamingEngineConfigFetcher.fetchConfig("someComputationId"));
     assertThat(fetchConfigError).isSameInstanceAs(e);
   }
+
+  @Test
+  public void test_streamingEngineStateTagEncodingVersion()
+      throws IOException, InterruptedException {
+    for (Optional<Integer> tagEncoding :
+        Arrays.<Optional<Integer>>asList(Optional.empty(), Optional.of(1), 
Optional.of(2))) {
+      StreamingConfigTask streamingConfigTask =
+          new StreamingConfigTask().setMaxWorkItemCommitBytes(100L);
+      tagEncoding.ifPresent(
+          version -> 
streamingConfigTask.setStreamingEngineStateTagEncodingVersion(version));
+      WorkItem initialConfig =
+          new 
WorkItem().setJobId("job").setStreamingConfigTask(streamingConfigTask);
+      CountDownLatch waitForInitialConfig = new CountDownLatch(1);
+      Set<StreamingGlobalConfig> receivedPipelineConfig = new HashSet<>();
+      when(mockDataflowServiceClient.getGlobalStreamingConfigWorkItem())
+          .thenReturn(Optional.of(initialConfig));
+      StreamingGlobalConfigHandleImpl globalConfigHandle = new 
StreamingGlobalConfigHandleImpl();
+      globalConfigHandle.registerConfigObserver(
+          config -> {
+            receivedPipelineConfig.add(config);
+            waitForInitialConfig.countDown();
+          });
+      streamingEngineConfigFetcher =
+          createConfigFetcher(/* waitForInitialConfig= */ true, 0, 
globalConfigHandle);
+      Thread asyncStartConfigLoader = new 
Thread(streamingEngineConfigFetcher::start);
+      asyncStartConfigLoader.start();
+      waitForInitialConfig.await();
+      asyncStartConfigLoader.join();
+      assertEquals(1, receivedPipelineConfig.size());
+      assertEquals(
+          Objects.equals(2, 
streamingConfigTask.getStreamingEngineStateTagEncodingVersion()),
+          receivedPipelineConfig.iterator().next().enableStateTagEncodingV2());
+    }
+  }
 }

Reply via email to