This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 41779ad88d3 [Dataflow Streaming] Update windmill timer clasification 
logic to work with windmill state tag encoding v2 (#37604)
41779ad88d3 is described below

commit 41779ad88d330ec1f9d8f17e94bf72b6f5d4bf2f
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Feb 19 05:02:28 2026 -0800

    [Dataflow Streaming] Update windmill timer clasification logic to work with 
windmill state tag encoding v2 (#37604)
---
 .../worker/StreamingModeExecutionContext.java      | 34 ++++-----
 .../dataflow/worker/WindmillKeyedWorkItem.java     | 16 ++--
 .../dataflow/worker/WindmillTimerInternals.java    | 28 +++----
 ...NamespacePrefix.java => WindmillTimerType.java} | 28 +------
 .../worker/windmill/state/WindmillTagEncoding.java | 24 +++---
 .../windmill/state/WindmillTagEncodingV1.java      | 85 +++++++++++++++-------
 .../windmill/state/WindmillTagEncodingV2.java      | 42 +++++------
 .../worker/windmill/state/WindmillTimerData.java   | 34 +++++++++
 .../worker/StreamingGroupAlsoByWindowFnsTest.java  |  2 +-
 .../dataflow/worker/WindmillKeyedWorkItemTest.java |  2 +-
 .../windmill/state/WindmillTagEncodingV1Test.java  | 23 +++---
 .../windmill/state/WindmillTagEncodingV2Test.java  | 61 ++++++++--------
 12 files changed, 208 insertions(+), 171 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 2c936f88e28..b4568877143 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -66,6 +66,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReade
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -798,7 +799,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       this.systemTimerInternals =
           new WindmillTimerInternals(
               stateFamily,
-              WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+              WindmillTimerType.SYSTEM_TIMER,
               processingTime,
               watermarks,
               windmillTagEncoding,
@@ -807,7 +808,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       this.userTimerInternals =
           new WindmillTimerInternals(
               stateFamily,
-              WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+              WindmillTimerType.USER_TIMER,
               processingTime,
               watermarks,
               windmillTagEncoding,
@@ -832,17 +833,15 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       if (cachedFiredSystemTimers == null) {
         cachedFiredSystemTimers =
             
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isSystemTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
+                .filter(timer -> timer.getStateFamily().equals(stateFamily))
                 .transform(
                     timer ->
                         windmillTagEncoding.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
-                            timer,
-                            windowCoder,
-                            getDrainMode()))
+                            timer, windowCoder, getDrainMode()))
+                .filter(
+                    windmillTimerData ->
+                        windmillTimerData.getWindmillTimerType() == 
WindmillTimerType.SYSTEM_TIMER)
+                .transform(WindmillTimerData::getTimerData)
                 .iterator();
       }
 
@@ -895,17 +894,16 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
         cachedFiredUserTimers =
             Iterators.peekingIterator(
                 
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
-                    .filter(
-                        timer ->
-                            WindmillTimerInternals.isUserTimer(timer)
-                                && timer.getStateFamily().equals(stateFamily))
+                    .filter(timer -> 
timer.getStateFamily().equals(stateFamily))
                     .transform(
                         timer ->
                             windmillTagEncoding.windmillTimerToTimerData(
-                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
-                                timer,
-                                windowCoder,
-                                getDrainMode()))
+                                timer, windowCoder, getDrainMode()))
+                    .filter(
+                        windmillTimerData ->
+                            windmillTimerData.getWindmillTimerType()
+                                == WindmillTimerType.USER_TIMER)
+                    .transform(WindmillTimerData::getTimerData)
                     .iterator());
       }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index da69f1a2371..59489babf0b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -31,6 +33,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
+import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
@@ -101,12 +104,13 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
     return eventTimers
         .append(nonEventTimers)
         .transform(
-            timer ->
-                windmillTagEncoding.windmillTimerToTimerData(
-                    WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
-                    timer,
-                    windowCoder,
-                    drainMode));
+            timer -> {
+              WindmillTimerData windmillTimerData =
+                  windmillTagEncoding.windmillTimerToTimerData(timer, 
windowCoder, drainMode);
+              checkState(
+                  windmillTimerData.getWindmillTimerType() == 
WindmillTimerType.SYSTEM_TIMER);
+              return windmillTimerData.getTimerData();
+            });
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c62839333c7..07ce62d5933 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -60,13 +60,13 @@ class WindmillTimerInternals implements TimerInternals {
   private final Watermarks watermarks;
   private final Instant processingTime;
   private final String stateFamily;
-  private final WindmillNamespacePrefix prefix;
+  private final WindmillTimerType type;
   private final Consumer<TimerData> onTimerModified;
   private final WindmillTagEncoding windmillTagEncoding;
 
   public WindmillTimerInternals(
       String stateFamily, // unique identifies a step
-      WindmillNamespacePrefix prefix, // partitions user and system namespaces 
into "/u" and "/s"
+      WindmillTimerType type,
       Instant processingTime,
       Watermarks watermarks,
       WindmillTagEncoding windmillTagEncoding,
@@ -74,14 +74,14 @@ class WindmillTimerInternals implements TimerInternals {
     this.watermarks = watermarks;
     this.processingTime = checkNotNull(processingTime);
     this.stateFamily = stateFamily;
-    this.prefix = prefix;
+    this.type = type;
     this.windmillTagEncoding = windmillTagEncoding;
     this.onTimerModified = onTimerModified;
   }
 
-  public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
+  public WindmillTimerInternals withType(WindmillTimerType type) {
     return new WindmillTimerInternals(
-        stateFamily, prefix, processingTime, watermarks, windmillTagEncoding, 
onTimerModified);
+        stateFamily, type, processingTime, watermarks, windmillTagEncoding, 
onTimerModified);
   }
 
   @Override
@@ -197,7 +197,7 @@ class WindmillTimerInternals implements TimerInternals {
 
       Timer.Builder timer =
           windmillTagEncoding.buildWindmillTimerFromTimerData(
-              stateFamily, prefix, timerData, 
outputBuilder.addOutputTimersBuilder());
+              stateFamily, type, timerData, 
outputBuilder.addOutputTimersBuilder());
 
       if (value.getValue()) {
         // Setting the timer. If it is a user timer, set a hold.
@@ -210,7 +210,7 @@ class WindmillTimerInternals implements TimerInternals {
             // Setting a timer, clear any prior hold and set to the new value
             outputBuilder
                 .addWatermarkHoldsBuilder()
-                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
+                .setTag(windmillTagEncoding.timerHoldTag(type, timerData, 
timer.getTag()))
                 .setStateFamily(stateFamily)
                 .setReset(true)
                 .addTimestamps(
@@ -219,7 +219,7 @@ class WindmillTimerInternals implements TimerInternals {
             // Clear the hold in case a previous iteration of this timer set 
one.
             outputBuilder
                 .addWatermarkHoldsBuilder()
-                .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
+                .setTag(windmillTagEncoding.timerHoldTag(type, timerData, 
timer.getTag()))
                 .setStateFamily(stateFamily)
                 .setReset(true);
           }
@@ -234,7 +234,7 @@ class WindmillTimerInternals implements TimerInternals {
           // We are deleting timer; clear the hold
           outputBuilder
               .addWatermarkHoldsBuilder()
-              .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, 
timer.getTag()))
+              .setTag(windmillTagEncoding.timerHoldTag(type, timerData, 
timer.getTag()))
               .setStateFamily(stateFamily)
               .setReset(true);
         }
@@ -247,15 +247,7 @@ class WindmillTimerInternals implements TimerInternals {
 
   private boolean needsWatermarkHold(TimerData timerData) {
     // If it is a user timer or a system timer with outputTimestamp different 
than timestamp
-    return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
+    return WindmillTimerType.USER_TIMER.equals(type)
         || !timerData.getTimestamp().isEqual(timerData.getOutputTimestamp());
   }
-
-  public static boolean isSystemTimer(Windmill.Timer timer) {
-    return 
timer.getTag().startsWith(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.byteString());
-  }
-
-  public static boolean isUserTimer(Windmill.Timer timer) {
-    return 
timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString());
-  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
similarity index 55%
rename from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
rename to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
index 4dc95aa1a0c..a421d94a582 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
@@ -18,30 +18,10 @@
 package org.apache.beam.runners.dataflow.worker;
 
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 
-/**
- * A prefix for a Windmill state or timer tag to separate user state and 
timers from system state
- * and timers.
- */
+/** A type for a Windmill timer to separate user state and timers from system 
state and timers. */
 @Internal
-public enum WindmillNamespacePrefix {
-  USER_NAMESPACE_PREFIX {
-    @Override
-    public ByteString byteString() {
-      return USER_NAMESPACE_BYTESTRING;
-    }
-  },
-
-  SYSTEM_NAMESPACE_PREFIX {
-    @Override
-    public ByteString byteString() {
-      return SYSTEM_NAMESPACE_BYTESTRING;
-    }
-  };
-
-  public abstract ByteString byteString();
-
-  private static final ByteString USER_NAMESPACE_BYTESTRING = 
ByteString.copyFromUtf8("/u");
-  private static final ByteString SYSTEM_NAMESPACE_BYTESTRING = 
ByteString.copyFromUtf8("/s");
+public enum WindmillTimerType {
+  USER_TIMER,
+  SYSTEM_TIMER
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
index a979a1d982c..f9e1e7f8cab 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
@@ -21,8 +21,8 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.sdk.annotations.Internal;
@@ -58,22 +58,18 @@ public abstract class WindmillTagEncoding {
    * @param timerTag tag of the timer that maps to the hold.
    */
   public abstract ByteString timerHoldTag(
-      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
timerTag);
+      WindmillTimerType windmillTimerType, TimerData timerData, ByteString 
timerTag);
 
   /**
-   * Produce a tag that is guaranteed to be unique for the given prefix, 
namespace, domain and
-   * timestamp.
+   * Produce a tag that is guaranteed to be unique for the given timer type 
and TimerData
    *
    * <p>This is necessary because Windmill will deduplicate based only on this 
tag.
    */
-  public abstract ByteString timerTag(WindmillNamespacePrefix prefix, 
TimerData timerData);
+  public abstract ByteString timerTag(WindmillTimerType windmillTimerType, 
TimerData timerData);
 
-  /** Converts Windmill Timer to beam TimerData */
-  public abstract TimerData windmillTimerToTimerData(
-      WindmillNamespacePrefix prefix,
-      Timer timer,
-      Coder<? extends BoundedWindow> windowCoder,
-      boolean draining);
+  /** Converts Windmill Timer to TimerData */
+  public abstract WindmillTimerData windmillTimerToTimerData(
+      Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean 
draining);
 
   /**
    * Uses the given {@link Timer} builder to build a windmill {@link Timer} 
from {@link TimerData}.
@@ -82,11 +78,13 @@ public abstract class WindmillTagEncoding {
    */
   public Timer.Builder buildWindmillTimerFromTimerData(
       @Nullable String stateFamily,
-      WindmillNamespacePrefix prefix,
+      WindmillTimerType windmillTimerType,
       TimerData timerData,
       Timer.Builder builder) {
 
-    builder.setTag(timerTag(prefix, 
timerData)).setType(timerType(timerData.getDomain()));
+    builder
+        .setTag(timerTag(windmillTimerType, timerData))
+        .setType(timerType(timerData.getDomain()));
 
     if (stateFamily != null) {
       builder.setStateFamily(stateFamily);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
index 14c3f8c0179..9efdf5e925c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
@@ -17,16 +17,14 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.state;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
 import java.io.IOException;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
 import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
 import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -71,11 +69,11 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
   /** {@inheritDoc} */
   @Override
   public ByteString timerHoldTag(
-      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
unusedTimerTag) {
+      WindmillTimerType windmillTimerType, TimerData timerData, ByteString 
unusedTimerTag) {
     String tagString;
     if ("".equals(timerData.getTimerFamilyId())) {
       tagString =
-          prefix.byteString().toStringUtf8()
+          namespacePrefixString(windmillTimerType)
               + // this never ends with a slash
               TIMER_HOLD_PREFIX
               + // this never ends with a slash
@@ -86,7 +84,7 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
       ;
     } else {
       tagString =
-          prefix.byteString().toStringUtf8()
+          namespacePrefixString(windmillTimerType)
               + // this never ends with a slash
               TIMER_HOLD_PREFIX
               + // this never ends with a slash
@@ -105,11 +103,11 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
 
   /** {@inheritDoc} */
   @Override
-  public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+  public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData 
timerData) {
     String tagString;
     if (useNewTimerTagEncoding(timerData)) {
       tagString =
-          prefix.byteString().toStringUtf8()
+          namespacePrefixString(windmillTimerType)
               + // this never ends with a slash
               timerData.getNamespace().stringKey()
               + // this must begin and end with a slash
@@ -121,7 +119,7 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
     } else {
       // Timers without timerFamily would have timerFamily would be an empty 
string
       tagString =
-          prefix.byteString().toStringUtf8()
+          namespacePrefixString(windmillTimerType)
               + // this never ends with a slash
               timerData.getNamespace().stringKey()
               + // this must begin and end with a slash
@@ -134,11 +132,8 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
 
   /** {@inheritDoc} */
   @Override
-  public TimerData windmillTimerToTimerData(
-      WindmillNamespacePrefix prefix,
-      Timer timer,
-      Coder<? extends BoundedWindow> windowCoder,
-      boolean draining) {
+  public WindmillTimerData windmillTimerToTimerData(
+      Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean 
draining) {
 
     // The tag is a path-structure string but cheaper to parse than a proper 
URI. It follows
     // this pattern, where no component but the ID can contain a slash
@@ -160,16 +155,20 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
     //  - the id is totally arbitrary; currently unescaped though that could 
change
 
     ByteString tag = timer.getTag();
-    checkArgument(
-        tag.startsWith(prefix.byteString()),
-        "Expected timer tag %s to start with prefix %s",
-        tag,
-        prefix.byteString());
+    WindmillTimerType timerType;
+    if 
(tag.startsWith(namespacePrefixByteString(WindmillTimerType.SYSTEM_TIMER))) {
+      timerType = WindmillTimerType.SYSTEM_TIMER;
+    } else if 
(tag.startsWith(namespacePrefixByteString(WindmillTimerType.USER_TIMER))) {
+      timerType = WindmillTimerType.USER_TIMER;
+    } else {
+      throw new IllegalArgumentException("Unknown timer tag prefix: " + 
tag.toStringUtf8());
+    }
 
     Instant timestamp = 
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
 
     // Parse the namespace.
-    int namespaceStart = prefix.byteString().size(); // drop the prefix, leave 
the begin slash
+    int namespaceStart =
+        namespacePrefixByteString(timerType).size(); // drop the prefix, leave 
the begin slash
     int namespaceEnd = namespaceStart;
     while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
       namespaceEnd++;
@@ -225,21 +224,51 @@ public class WindmillTagEncodingV1 extends 
WindmillTagEncoding {
     }
 
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, 
windowCoder);
-    return TimerData.of(
-        timerId,
-        timerFamily,
-        namespace,
-        timestamp,
-        outputTimestamp,
-        timerTypeToTimeDomain(timer.getType()));
+    return WindmillTimerData.create(
+        timerType,
+        TimerData.of(
+            timerId,
+            timerFamily,
+            namespace,
+            timestamp,
+            outputTimestamp,
+            timerTypeToTimeDomain(timer.getType())));
     // todo add draining (https://github.com/apache/beam/issues/36884)
-
   }
 
   private static boolean useNewTimerTagEncoding(TimerData timerData) {
     return !timerData.getTimerFamilyId().isEmpty();
   }
 
+  private static final String USER_NAMESPACE_STRING = "/u";
+  private static final ByteString USER_NAMESPACE_BYTESTRING =
+      ByteString.copyFromUtf8(USER_NAMESPACE_STRING);
+  private static final String SYSTEM_NAMESPACE_STRING = "/s";
+  private static final ByteString SYSTEM_NAMESPACE_BYTESTRING =
+      ByteString.copyFromUtf8(SYSTEM_NAMESPACE_STRING);
+
+  private static String namespacePrefixString(WindmillTimerType 
windmillTimerType) {
+    switch (windmillTimerType) {
+      case USER_TIMER:
+        return USER_NAMESPACE_STRING;
+      case SYSTEM_TIMER:
+        return SYSTEM_NAMESPACE_STRING;
+      default:
+        throw new IllegalStateException("unexpected windmill timer type");
+    }
+  }
+
+  private static ByteString namespacePrefixByteString(WindmillTimerType 
windmillTimerType) {
+    switch (windmillTimerType) {
+      case USER_TIMER:
+        return USER_NAMESPACE_BYTESTRING;
+      case SYSTEM_TIMER:
+        return SYSTEM_NAMESPACE_BYTESTRING;
+      default:
+        throw new IllegalStateException("unexpected windmill timer type");
+    }
+  }
+
   /** @return the singleton WindmillStateTagUtil */
   public static WindmillTagEncodingV1 instance() {
     return INSTANCE;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
index 0702c375282..c607698ad9f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.state;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
 import java.io.IOException;
 import java.io.InputStream;
 import javax.annotation.concurrent.ThreadSafe;
@@ -30,8 +28,8 @@ import 
org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
 import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
 import 
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -219,7 +217,7 @@ public class WindmillTagEncodingV2 extends 
WindmillTagEncoding {
   /** {@inheritDoc} */
   @Override
   public ByteString timerHoldTag(
-      WindmillNamespacePrefix prefix, TimerData timerData, ByteString 
timerTag) {
+      WindmillTimerType windmillTimerType, TimerData timerData, ByteString 
timerTag) {
     // Same encoding for timer tag and timer hold tag.
     // They are put in different places and won't collide.
     return timerTag;
@@ -227,16 +225,16 @@ public class WindmillTagEncodingV2 extends 
WindmillTagEncoding {
 
   /** {@inheritDoc} */
   @Override
-  public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData 
timerData) {
+  public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData 
timerData) {
     try (StreamHandle streamHandle = 
ThreadLocalByteStringOutputStream.acquire()) {
       ByteStringOutputStream stream = streamHandle.stream();
       encodeNamespace(timerData.getNamespace(), stream);
-      if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) {
+      if (WindmillTimerType.SYSTEM_TIMER.equals(windmillTimerType)) {
         stream.write(SYSTEM_TIMER_BYTE);
-      } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)) 
{
+      } else if (WindmillTimerType.USER_TIMER.equals(windmillTimerType)) {
         stream.write(USER_TIMER_BYTE);
       } else {
-        throw new IllegalStateException("Unexpected WindmillNamespacePrefix" + 
prefix);
+        throw new IllegalStateException("Unexpected WindmillTimerType" + 
windmillTimerType);
       }
       StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream);
       StringUtf8Coder.of().encode(timerData.getTimerId(), stream);
@@ -248,21 +246,19 @@ public class WindmillTagEncodingV2 extends 
WindmillTagEncoding {
 
   /** {@inheritDoc} */
   @Override
-  public TimerData windmillTimerToTimerData(
-      WindmillNamespacePrefix prefix,
-      Timer timer,
-      Coder<? extends BoundedWindow> windowCoder,
-      boolean draining) {
+  public WindmillTimerData windmillTimerToTimerData(
+      Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean 
draining) {
 
     InputStream stream = timer.getTag().newInput();
 
     try {
       StateNamespace stateNamespace = decodeNamespace(stream, windowCoder);
       int nextByte = stream.read();
+      WindmillTimerType timerType;
       if (nextByte == SYSTEM_TIMER_BYTE) {
-        
checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix));
+        timerType = WindmillTimerType.SYSTEM_TIMER;
       } else if (nextByte == USER_TIMER_BYTE) {
-        
checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix));
+        timerType = WindmillTimerType.USER_TIMER;
       } else {
         throw new IllegalStateException("Unexpected timer tag byte: " + 
nextByte);
       }
@@ -282,13 +278,15 @@ public class WindmillTagEncodingV2 extends 
WindmillTagEncoding {
         }
       }
 
-      return TimerData.of(
-          timerId,
-          timerFamilyId,
-          stateNamespace,
-          timestamp,
-          outputTimestamp,
-          timerTypeToTimeDomain(timer.getType()));
+      return WindmillTimerData.create(
+          timerType,
+          TimerData.of(
+              timerId,
+              timerFamilyId,
+              stateNamespace,
+              timestamp,
+              outputTimestamp,
+              timerTypeToTimeDomain(timer.getType())));
 
     } catch (IOException e) {
       throw new RuntimeException(e);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
new file mode 100644
index 00000000000..f3708f7bcfd
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
+
+@AutoValue
+public abstract class WindmillTimerData {
+
+  public abstract WindmillTimerType getWindmillTimerType();
+
+  public abstract TimerData getTimerData();
+
+  public static WindmillTimerData create(WindmillTimerType windmillTimerType, 
TimerData timerData) {
+    return new AutoValue_WindmillTimerData(windmillTimerType, timerData);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 35dc19dd781..20dec94de08 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -152,7 +152,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
         .setTag(
             WindmillTagEncodingV1.instance()
                 .timerTag(
-                    WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                    WindmillTimerType.SYSTEM_TIMER,
                     TimerData.of(
                         namespace,
                         timestamp,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 9062c881096..e5de780ac4e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -198,7 +198,7 @@ public class WindmillKeyedWorkItemTest {
     return Windmill.Timer.newBuilder()
         .setTag(
             windmillTagEncoding.timerTag(
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 TimerData.of(
                     ns,
                     new Instant(timestamp),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
index 48751c57754..fcac966ff96 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.sdk.coders.Coder;
@@ -140,7 +140,7 @@ public class WindmillTagEncodingV1Test {
         StateNamespace namespace = coderAndNamespace.getValue();
 
         for (TimeDomain timeDomain : TimeDomain.values()) {
-          for (WindmillNamespacePrefix prefix : 
WindmillNamespacePrefix.values()) {
+          for (WindmillTimerType windmillTimerType : 
WindmillTimerType.values()) {
             for (Instant timestamp : TEST_TIMESTAMPS) {
               List<TimerData> anonymousTimers =
                   ImmutableList.of(
@@ -157,16 +157,17 @@ public class WindmillTagEncodingV1Test {
                     
timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
                         ? BoundedWindow.TIMESTAMP_MIN_VALUE
                         : timer.getOutputTimestamp();
-                TimerData computed =
+                WindmillTimerData windmillTimerData =
                     WindmillTagEncodingV1.instance()
                         .windmillTimerToTimerData(
-                            prefix,
                             WindmillTagEncodingV1.instance()
                                 .buildWindmillTimerFromTimerData(
-                                    stateFamily, prefix, timer, 
Timer.newBuilder())
+                                    stateFamily, windmillTimerType, timer, 
Timer.newBuilder())
                                 .build(),
                             coder,
                             false);
+                TimerData timerData = windmillTimerData.getTimerData();
+                assertEquals(windmillTimerType, 
windmillTimerData.getWindmillTimerType());
                 // The function itself bounds output, so we dont expect the 
original input as the
                 // output, we expect it to be bounded
                 TimerData expected =
@@ -177,7 +178,7 @@ public class WindmillTagEncodingV1Test {
                         timer.getDomain(),
                         CausedByDrain.NORMAL);
 
-                assertThat(computed, equalTo(expected));
+                assertThat(timerData, equalTo(expected));
               }
 
               for (String timerId : TEST_TIMER_IDS) {
@@ -221,17 +222,17 @@ public class WindmillTagEncodingV1Test {
                           timer.getTimestamp(),
                           expectedTimestamp,
                           timer.getDomain());
-                  assertThat(
+                  WindmillTimerData windmillTimerData =
                       WindmillTagEncodingV1.instance()
                           .windmillTimerToTimerData(
-                              prefix,
                               WindmillTagEncodingV1.instance()
                                   .buildWindmillTimerFromTimerData(
-                                      stateFamily, prefix, timer, 
Timer.newBuilder())
+                                      stateFamily, windmillTimerType, timer, 
Timer.newBuilder())
                                   .build(),
                               coder,
-                              false),
-                      equalTo(expected));
+                              false);
+                  assertEquals(windmillTimerType, 
windmillTimerData.getWindmillTimerType());
+                  assertThat(windmillTimerData.getTimerData(), 
equalTo(expected));
                 }
               }
             }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
index 1284c46a99a..fbbcfed7cf0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
@@ -35,8 +35,8 @@ import 
org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
 import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -222,7 +222,7 @@ public class WindmillTagEncodingV2Test {
 
     @Parameters(
         name =
-            "{index}: namespace={0} prefix={1} expectedBytes={2} 
includeTimerId={3}"
+            "{index}: namespace={0} windmillTimerType={1} expectedBytes={2} 
includeTimerId={3}"
                 + " includeTimerFamilyId={4} timeDomain={4}")
     public static Collection<Object[]> data() {
       List<Object[]> data = new ArrayList<>();
@@ -235,52 +235,52 @@ public class WindmillTagEncodingV2Test {
             ImmutableList.of(
                 new Object[] {
                   GLOBAL_NAMESPACE,
-                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  WindmillTimerType.USER_TIMER,
                   GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
                 },
                 new Object[] {
                   GLOBAL_NAMESPACE,
-                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  WindmillTimerType.SYSTEM_TIMER,
                   GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
                 },
                 new Object[] {
                   INTERVAL_WINDOW_NAMESPACE,
-                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  WindmillTimerType.USER_TIMER,
                   
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
                 },
                 new Object[] {
                   INTERVAL_WINDOW_NAMESPACE,
-                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  WindmillTimerType.SYSTEM_TIMER,
                   
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
                 },
                 new Object[] {
                   OTHER_WINDOW_NAMESPACE,
-                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  WindmillTimerType.USER_TIMER,
                   OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
                 },
                 new Object[] {
                   OTHER_WINDOW_NAMESPACE,
-                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  WindmillTimerType.SYSTEM_TIMER,
                   OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
                 },
                 new Object[] {
                   INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
-                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  WindmillTimerType.USER_TIMER,
                   
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
                 },
                 new Object[] {
                   INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
-                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  WindmillTimerType.SYSTEM_TIMER,
                   
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
                 },
                 new Object[] {
                   OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
-                  WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                  WindmillTimerType.USER_TIMER,
                   
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
                 },
                 new Object[] {
                   OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
-                  WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                  WindmillTimerType.SYSTEM_TIMER,
                   
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
                 });
 
@@ -298,7 +298,7 @@ public class WindmillTagEncodingV2Test {
     public StateNamespace namespace;
 
     @Parameter(1)
-    public WindmillNamespacePrefix prefix;
+    public WindmillTimerType windmillTimerType;
 
     @Parameter(2)
     public ByteString expectedBytes;
@@ -327,74 +327,75 @@ public class WindmillTagEncodingV2Test {
                   new Instant(456),
                   timeDomain,
                   CausedByDrain.NORMAL);
-      assertEquals(expectedBytes, 
WindmillTagEncodingV2.instance().timerTag(prefix, timerData));
+      assertEquals(
+          expectedBytes, 
WindmillTagEncodingV2.instance().timerTag(windmillTimerType, timerData));
     }
   }
 
   @RunWith(Parameterized.class)
   public static class TimerDataFromTimerTest {
 
-    @Parameters(name = "{index}: namespace={0} prefix={1} draining={4} 
timeDomain={5}")
+    @Parameters(name = "{index}: namespace={0} windmillTimerType={1} 
draining={4} timeDomain={5}")
     public static Collection<Object[]> data() {
       List<Object[]> tests =
           ImmutableList.of(
               new Object[] {
                 GLOBAL_NAMESPACE,
-                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                WindmillTimerType.USER_TIMER,
                 GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
                 GlobalWindow.Coder.INSTANCE
               },
               new Object[] {
                 GLOBAL_NAMESPACE,
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
                 GlobalWindow.Coder.INSTANCE
               },
               new Object[] {
                 INTERVAL_WINDOW_NAMESPACE,
-                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                WindmillTimerType.USER_TIMER,
                 INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
                 IntervalWindow.getCoder()
               },
               new Object[] {
                 INTERVAL_WINDOW_NAMESPACE,
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
                 IntervalWindow.getCoder()
               },
               new Object[] {
                 OTHER_WINDOW_NAMESPACE,
-                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                WindmillTimerType.USER_TIMER,
                 OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
                 new CustomWindow.CustomWindowCoder()
               },
               new Object[] {
                 OTHER_WINDOW_NAMESPACE,
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
                 new CustomWindow.CustomWindowCoder()
               },
               new Object[] {
                 INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
-                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                WindmillTimerType.USER_TIMER,
                 
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
                 IntervalWindow.getCoder()
               },
               new Object[] {
                 INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
                 IntervalWindow.getCoder()
               },
               new Object[] {
                 OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
-                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                WindmillTimerType.USER_TIMER,
                 
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
                 new CustomWindow.CustomWindowCoder()
               },
               new Object[] {
                 OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
-                WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                WindmillTimerType.SYSTEM_TIMER,
                 
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
                 new CustomWindow.CustomWindowCoder()
               });
@@ -415,7 +416,7 @@ public class WindmillTagEncodingV2Test {
     public StateNamespace namespace;
 
     @Parameter(1)
-    public WindmillNamespacePrefix prefix;
+    public WindmillTimerType windmillTimerType;
 
     @Parameter(2)
     public ByteString timerTag;
@@ -444,8 +445,10 @@ public class WindmillTagEncodingV2Test {
               
.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp))
               .setType(timerType(timeDomain))
               .build();
-      assertEquals(
-          timerData, encoding.windmillTimerToTimerData(prefix, timer, 
windowCoder, draining));
+      WindmillTimerData windmillTimerData =
+          encoding.windmillTimerToTimerData(timer, windowCoder, draining);
+      assertEquals(windmillTimerType, 
windmillTimerData.getWindmillTimerType());
+      assertEquals(timerData, windmillTimerData.getTimerData());
     }
   }
 
@@ -467,7 +470,7 @@ public class WindmillTagEncodingV2Test {
       ByteString timerTag = ByteString.copyFrom(bytes);
       assertEquals(
           WindmillTagEncodingV2.instance()
-              .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, 
timerData, timerTag),
+              .timerHoldTag(WindmillTimerType.SYSTEM_TIMER, timerData, 
timerTag),
           timerTag);
     }
   }

Reply via email to