This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 41779ad88d3 [Dataflow Streaming] Update windmill timer clasification
logic to work with windmill state tag encoding v2 (#37604)
41779ad88d3 is described below
commit 41779ad88d330ec1f9d8f17e94bf72b6f5d4bf2f
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Feb 19 05:02:28 2026 -0800
[Dataflow Streaming] Update windmill timer clasification logic to work with
windmill state tag encoding v2 (#37604)
---
.../worker/StreamingModeExecutionContext.java | 34 ++++-----
.../dataflow/worker/WindmillKeyedWorkItem.java | 16 ++--
.../dataflow/worker/WindmillTimerInternals.java | 28 +++----
...NamespacePrefix.java => WindmillTimerType.java} | 28 +------
.../worker/windmill/state/WindmillTagEncoding.java | 24 +++---
.../windmill/state/WindmillTagEncodingV1.java | 85 +++++++++++++++-------
.../windmill/state/WindmillTagEncodingV2.java | 42 +++++------
.../worker/windmill/state/WindmillTimerData.java | 34 +++++++++
.../worker/StreamingGroupAlsoByWindowFnsTest.java | 2 +-
.../dataflow/worker/WindmillKeyedWorkItemTest.java | 2 +-
.../windmill/state/WindmillTagEncodingV1Test.java | 23 +++---
.../windmill/state/WindmillTagEncodingV2Test.java | 61 ++++++++--------
12 files changed, 208 insertions(+), 171 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 2c936f88e28..b4568877143 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -66,6 +66,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReade
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -798,7 +799,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
this.systemTimerInternals =
new WindmillTimerInternals(
stateFamily,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
processingTime,
watermarks,
windmillTagEncoding,
@@ -807,7 +808,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
this.userTimerInternals =
new WindmillTimerInternals(
stateFamily,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
processingTime,
watermarks,
windmillTagEncoding,
@@ -832,17 +833,15 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
if (cachedFiredSystemTimers == null) {
cachedFiredSystemTimers =
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isSystemTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
+ .filter(timer -> timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
- timer,
- windowCoder,
- getDrainMode()))
+ timer, windowCoder, getDrainMode()))
+ .filter(
+ windmillTimerData ->
+ windmillTimerData.getWindmillTimerType() ==
WindmillTimerType.SYSTEM_TIMER)
+ .transform(WindmillTimerData::getTimerData)
.iterator();
}
@@ -895,17 +894,16 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
cachedFiredUserTimers =
Iterators.peekingIterator(
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
- .filter(
- timer ->
- WindmillTimerInternals.isUserTimer(timer)
- && timer.getStateFamily().equals(stateFamily))
+ .filter(timer ->
timer.getStateFamily().equals(stateFamily))
.transform(
timer ->
windmillTagEncoding.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
- timer,
- windowCoder,
- getDrainMode()))
+ timer, windowCoder, getDrainMode()))
+ .filter(
+ windmillTimerData ->
+ windmillTimerData.getWindmillTimerType()
+ == WindmillTimerType.USER_TIMER)
+ .transform(WindmillTimerData::getTimerData)
.iterator());
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index da69f1a2371..59489babf0b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -31,6 +33,7 @@ import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTimerData;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
@@ -101,12 +104,13 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
return eventTimers
.append(nonEventTimers)
.transform(
- timer ->
- windmillTagEncoding.windmillTimerToTimerData(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
- timer,
- windowCoder,
- drainMode));
+ timer -> {
+ WindmillTimerData windmillTimerData =
+ windmillTagEncoding.windmillTimerToTimerData(timer,
windowCoder, drainMode);
+ checkState(
+ windmillTimerData.getWindmillTimerType() ==
WindmillTimerType.SYSTEM_TIMER);
+ return windmillTimerData.getTimerData();
+ });
}
@Override
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c62839333c7..07ce62d5933 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -60,13 +60,13 @@ class WindmillTimerInternals implements TimerInternals {
private final Watermarks watermarks;
private final Instant processingTime;
private final String stateFamily;
- private final WindmillNamespacePrefix prefix;
+ private final WindmillTimerType type;
private final Consumer<TimerData> onTimerModified;
private final WindmillTagEncoding windmillTagEncoding;
public WindmillTimerInternals(
String stateFamily, // unique identifies a step
- WindmillNamespacePrefix prefix, // partitions user and system namespaces
into "/u" and "/s"
+ WindmillTimerType type,
Instant processingTime,
Watermarks watermarks,
WindmillTagEncoding windmillTagEncoding,
@@ -74,14 +74,14 @@ class WindmillTimerInternals implements TimerInternals {
this.watermarks = watermarks;
this.processingTime = checkNotNull(processingTime);
this.stateFamily = stateFamily;
- this.prefix = prefix;
+ this.type = type;
this.windmillTagEncoding = windmillTagEncoding;
this.onTimerModified = onTimerModified;
}
- public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix) {
+ public WindmillTimerInternals withType(WindmillTimerType type) {
return new WindmillTimerInternals(
- stateFamily, prefix, processingTime, watermarks, windmillTagEncoding,
onTimerModified);
+ stateFamily, type, processingTime, watermarks, windmillTagEncoding,
onTimerModified);
}
@Override
@@ -197,7 +197,7 @@ class WindmillTimerInternals implements TimerInternals {
Timer.Builder timer =
windmillTagEncoding.buildWindmillTimerFromTimerData(
- stateFamily, prefix, timerData,
outputBuilder.addOutputTimersBuilder());
+ stateFamily, type, timerData,
outputBuilder.addOutputTimersBuilder());
if (value.getValue()) {
// Setting the timer. If it is a user timer, set a hold.
@@ -210,7 +210,7 @@ class WindmillTimerInternals implements TimerInternals {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
+ .setTag(windmillTagEncoding.timerHoldTag(type, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
@@ -219,7 +219,7 @@ class WindmillTimerInternals implements TimerInternals {
// Clear the hold in case a previous iteration of this timer set
one.
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
+ .setTag(windmillTagEncoding.timerHoldTag(type, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
@@ -234,7 +234,7 @@ class WindmillTimerInternals implements TimerInternals {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
+ .setTag(windmillTagEncoding.timerHoldTag(type, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
@@ -247,15 +247,7 @@ class WindmillTimerInternals implements TimerInternals {
private boolean needsWatermarkHold(TimerData timerData) {
// If it is a user timer or a system timer with outputTimestamp different
than timestamp
- return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
+ return WindmillTimerType.USER_TIMER.equals(type)
|| !timerData.getTimestamp().isEqual(timerData.getOutputTimestamp());
}
-
- public static boolean isSystemTimer(Windmill.Timer timer) {
- return
timer.getTag().startsWith(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.byteString());
- }
-
- public static boolean isUserTimer(Windmill.Timer timer) {
- return
timer.getTag().startsWith(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.byteString());
- }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
similarity index 55%
rename from
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
rename to
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
index 4dc95aa1a0c..a421d94a582 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillNamespacePrefix.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerType.java
@@ -18,30 +18,10 @@
package org.apache.beam.runners.dataflow.worker;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
-/**
- * A prefix for a Windmill state or timer tag to separate user state and
timers from system state
- * and timers.
- */
+/** A type for a Windmill timer to separate user state and timers from system
state and timers. */
@Internal
-public enum WindmillNamespacePrefix {
- USER_NAMESPACE_PREFIX {
- @Override
- public ByteString byteString() {
- return USER_NAMESPACE_BYTESTRING;
- }
- },
-
- SYSTEM_NAMESPACE_PREFIX {
- @Override
- public ByteString byteString() {
- return SYSTEM_NAMESPACE_BYTESTRING;
- }
- };
-
- public abstract ByteString byteString();
-
- private static final ByteString USER_NAMESPACE_BYTESTRING =
ByteString.copyFromUtf8("/u");
- private static final ByteString SYSTEM_NAMESPACE_BYTESTRING =
ByteString.copyFromUtf8("/s");
+public enum WindmillTimerType {
+ USER_TIMER,
+ SYSTEM_TIMER
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
index a979a1d982c..f9e1e7f8cab 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
@@ -21,8 +21,8 @@ import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.annotations.Internal;
@@ -58,22 +58,18 @@ public abstract class WindmillTagEncoding {
* @param timerTag tag of the timer that maps to the hold.
*/
public abstract ByteString timerHoldTag(
- WindmillNamespacePrefix prefix, TimerData timerData, ByteString
timerTag);
+ WindmillTimerType windmillTimerType, TimerData timerData, ByteString
timerTag);
/**
- * Produce a tag that is guaranteed to be unique for the given prefix,
namespace, domain and
- * timestamp.
+ * Produce a tag that is guaranteed to be unique for the given timer type
and TimerData
*
* <p>This is necessary because Windmill will deduplicate based only on this
tag.
*/
- public abstract ByteString timerTag(WindmillNamespacePrefix prefix,
TimerData timerData);
+ public abstract ByteString timerTag(WindmillTimerType windmillTimerType,
TimerData timerData);
- /** Converts Windmill Timer to beam TimerData */
- public abstract TimerData windmillTimerToTimerData(
- WindmillNamespacePrefix prefix,
- Timer timer,
- Coder<? extends BoundedWindow> windowCoder,
- boolean draining);
+ /** Converts Windmill Timer to TimerData */
+ public abstract WindmillTimerData windmillTimerToTimerData(
+ Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean
draining);
/**
* Uses the given {@link Timer} builder to build a windmill {@link Timer}
from {@link TimerData}.
@@ -82,11 +78,13 @@ public abstract class WindmillTagEncoding {
*/
public Timer.Builder buildWindmillTimerFromTimerData(
@Nullable String stateFamily,
- WindmillNamespacePrefix prefix,
+ WindmillTimerType windmillTimerType,
TimerData timerData,
Timer.Builder builder) {
- builder.setTag(timerTag(prefix,
timerData)).setType(timerType(timerData.getDomain()));
+ builder
+ .setTag(timerTag(windmillTimerType, timerData))
+ .setType(timerType(timerData.getDomain()));
if (stateFamily != null) {
builder.setStateFamily(stateFamily);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
index 14c3f8c0179..9efdf5e925c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
@@ -17,16 +17,14 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.state;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-
import java.io.IOException;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -71,11 +69,11 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
public ByteString timerHoldTag(
- WindmillNamespacePrefix prefix, TimerData timerData, ByteString
unusedTimerTag) {
+ WindmillTimerType windmillTimerType, TimerData timerData, ByteString
unusedTimerTag) {
String tagString;
if ("".equals(timerData.getTimerFamilyId())) {
tagString =
- prefix.byteString().toStringUtf8()
+ namespacePrefixString(windmillTimerType)
+ // this never ends with a slash
TIMER_HOLD_PREFIX
+ // this never ends with a slash
@@ -86,7 +84,7 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
;
} else {
tagString =
- prefix.byteString().toStringUtf8()
+ namespacePrefixString(windmillTimerType)
+ // this never ends with a slash
TIMER_HOLD_PREFIX
+ // this never ends with a slash
@@ -105,11 +103,11 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
- public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData
timerData) {
+ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData
timerData) {
String tagString;
if (useNewTimerTagEncoding(timerData)) {
tagString =
- prefix.byteString().toStringUtf8()
+ namespacePrefixString(windmillTimerType)
+ // this never ends with a slash
timerData.getNamespace().stringKey()
+ // this must begin and end with a slash
@@ -121,7 +119,7 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
} else {
// Timers without timerFamily would have timerFamily would be an empty
string
tagString =
- prefix.byteString().toStringUtf8()
+ namespacePrefixString(windmillTimerType)
+ // this never ends with a slash
timerData.getNamespace().stringKey()
+ // this must begin and end with a slash
@@ -134,11 +132,8 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
- public TimerData windmillTimerToTimerData(
- WindmillNamespacePrefix prefix,
- Timer timer,
- Coder<? extends BoundedWindow> windowCoder,
- boolean draining) {
+ public WindmillTimerData windmillTimerToTimerData(
+ Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean
draining) {
// The tag is a path-structure string but cheaper to parse than a proper
URI. It follows
// this pattern, where no component but the ID can contain a slash
@@ -160,16 +155,20 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
// - the id is totally arbitrary; currently unescaped though that could
change
ByteString tag = timer.getTag();
- checkArgument(
- tag.startsWith(prefix.byteString()),
- "Expected timer tag %s to start with prefix %s",
- tag,
- prefix.byteString());
+ WindmillTimerType timerType;
+ if
(tag.startsWith(namespacePrefixByteString(WindmillTimerType.SYSTEM_TIMER))) {
+ timerType = WindmillTimerType.SYSTEM_TIMER;
+ } else if
(tag.startsWith(namespacePrefixByteString(WindmillTimerType.USER_TIMER))) {
+ timerType = WindmillTimerType.USER_TIMER;
+ } else {
+ throw new IllegalArgumentException("Unknown timer tag prefix: " +
tag.toStringUtf8());
+ }
Instant timestamp =
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
// Parse the namespace.
- int namespaceStart = prefix.byteString().size(); // drop the prefix, leave
the begin slash
+ int namespaceStart =
+ namespacePrefixByteString(timerType).size(); // drop the prefix, leave
the begin slash
int namespaceEnd = namespaceStart;
while (namespaceEnd < tag.size() && tag.byteAt(namespaceEnd) != '+') {
namespaceEnd++;
@@ -225,21 +224,51 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
}
StateNamespace namespace = StateNamespaces.fromString(namespaceString,
windowCoder);
- return TimerData.of(
- timerId,
- timerFamily,
- namespace,
- timestamp,
- outputTimestamp,
- timerTypeToTimeDomain(timer.getType()));
+ return WindmillTimerData.create(
+ timerType,
+ TimerData.of(
+ timerId,
+ timerFamily,
+ namespace,
+ timestamp,
+ outputTimestamp,
+ timerTypeToTimeDomain(timer.getType())));
// todo add draining (https://github.com/apache/beam/issues/36884)
-
}
private static boolean useNewTimerTagEncoding(TimerData timerData) {
return !timerData.getTimerFamilyId().isEmpty();
}
+ private static final String USER_NAMESPACE_STRING = "/u";
+ private static final ByteString USER_NAMESPACE_BYTESTRING =
+ ByteString.copyFromUtf8(USER_NAMESPACE_STRING);
+ private static final String SYSTEM_NAMESPACE_STRING = "/s";
+ private static final ByteString SYSTEM_NAMESPACE_BYTESTRING =
+ ByteString.copyFromUtf8(SYSTEM_NAMESPACE_STRING);
+
+ private static String namespacePrefixString(WindmillTimerType
windmillTimerType) {
+ switch (windmillTimerType) {
+ case USER_TIMER:
+ return USER_NAMESPACE_STRING;
+ case SYSTEM_TIMER:
+ return SYSTEM_NAMESPACE_STRING;
+ default:
+ throw new IllegalStateException("unexpected windmill timer type");
+ }
+ }
+
+ private static ByteString namespacePrefixByteString(WindmillTimerType
windmillTimerType) {
+ switch (windmillTimerType) {
+ case USER_TIMER:
+ return USER_NAMESPACE_BYTESTRING;
+ case SYSTEM_TIMER:
+ return SYSTEM_NAMESPACE_BYTESTRING;
+ default:
+ throw new IllegalStateException("unexpected windmill timer type");
+ }
+ }
+
/** @return the singleton WindmillStateTagUtil */
public static WindmillTagEncodingV1 instance() {
return INSTANCE;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
index 0702c375282..c607698ad9f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.state;
-import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.ThreadSafe;
@@ -30,8 +28,8 @@ import
org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
@@ -219,7 +217,7 @@ public class WindmillTagEncodingV2 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
public ByteString timerHoldTag(
- WindmillNamespacePrefix prefix, TimerData timerData, ByteString
timerTag) {
+ WindmillTimerType windmillTimerType, TimerData timerData, ByteString
timerTag) {
// Same encoding for timer tag and timer hold tag.
// They are put in different places and won't collide.
return timerTag;
@@ -227,16 +225,16 @@ public class WindmillTagEncodingV2 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
- public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData
timerData) {
+ public ByteString timerTag(WindmillTimerType windmillTimerType, TimerData
timerData) {
try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
ByteStringOutputStream stream = streamHandle.stream();
encodeNamespace(timerData.getNamespace(), stream);
- if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) {
+ if (WindmillTimerType.SYSTEM_TIMER.equals(windmillTimerType)) {
stream.write(SYSTEM_TIMER_BYTE);
- } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix))
{
+ } else if (WindmillTimerType.USER_TIMER.equals(windmillTimerType)) {
stream.write(USER_TIMER_BYTE);
} else {
- throw new IllegalStateException("Unexpected WindmillNamespacePrefix" +
prefix);
+ throw new IllegalStateException("Unexpected WindmillTimerType" +
windmillTimerType);
}
StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream);
StringUtf8Coder.of().encode(timerData.getTimerId(), stream);
@@ -248,21 +246,19 @@ public class WindmillTagEncodingV2 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
- public TimerData windmillTimerToTimerData(
- WindmillNamespacePrefix prefix,
- Timer timer,
- Coder<? extends BoundedWindow> windowCoder,
- boolean draining) {
+ public WindmillTimerData windmillTimerToTimerData(
+ Timer timer, Coder<? extends BoundedWindow> windowCoder, boolean
draining) {
InputStream stream = timer.getTag().newInput();
try {
StateNamespace stateNamespace = decodeNamespace(stream, windowCoder);
int nextByte = stream.read();
+ WindmillTimerType timerType;
if (nextByte == SYSTEM_TIMER_BYTE) {
-
checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix));
+ timerType = WindmillTimerType.SYSTEM_TIMER;
} else if (nextByte == USER_TIMER_BYTE) {
-
checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix));
+ timerType = WindmillTimerType.USER_TIMER;
} else {
throw new IllegalStateException("Unexpected timer tag byte: " +
nextByte);
}
@@ -282,13 +278,15 @@ public class WindmillTagEncodingV2 extends
WindmillTagEncoding {
}
}
- return TimerData.of(
- timerId,
- timerFamilyId,
- stateNamespace,
- timestamp,
- outputTimestamp,
- timerTypeToTimeDomain(timer.getType()));
+ return WindmillTimerData.create(
+ timerType,
+ TimerData.of(
+ timerId,
+ timerFamilyId,
+ stateNamespace,
+ timestamp,
+ outputTimestamp,
+ timerTypeToTimeDomain(timer.getType())));
} catch (IOException e) {
throw new RuntimeException(e);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
new file mode 100644
index 00000000000..f3708f7bcfd
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTimerData.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
+
+@AutoValue
+public abstract class WindmillTimerData {
+
+ public abstract WindmillTimerType getWindmillTimerType();
+
+ public abstract TimerData getTimerData();
+
+ public static WindmillTimerData create(WindmillTimerType windmillTimerType,
TimerData timerData) {
+ return new AutoValue_WindmillTimerData(windmillTimerType, timerData);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 35dc19dd781..20dec94de08 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -152,7 +152,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
.setTag(
WindmillTagEncodingV1.instance()
.timerTag(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
TimerData.of(
namespace,
timestamp,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 9062c881096..e5de780ac4e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -198,7 +198,7 @@ public class WindmillKeyedWorkItemTest {
return Windmill.Timer.newBuilder()
.setTag(
windmillTagEncoding.timerTag(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
TimerData.of(
ns,
new Instant(timestamp),
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
index 48751c57754..fcac966ff96 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1Test.java
@@ -29,7 +29,7 @@ import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
@@ -140,7 +140,7 @@ public class WindmillTagEncodingV1Test {
StateNamespace namespace = coderAndNamespace.getValue();
for (TimeDomain timeDomain : TimeDomain.values()) {
- for (WindmillNamespacePrefix prefix :
WindmillNamespacePrefix.values()) {
+ for (WindmillTimerType windmillTimerType :
WindmillTimerType.values()) {
for (Instant timestamp : TEST_TIMESTAMPS) {
List<TimerData> anonymousTimers =
ImmutableList.of(
@@ -157,16 +157,17 @@ public class WindmillTagEncodingV1Test {
timer.getOutputTimestamp().isBefore(BoundedWindow.TIMESTAMP_MIN_VALUE)
? BoundedWindow.TIMESTAMP_MIN_VALUE
: timer.getOutputTimestamp();
- TimerData computed =
+ WindmillTimerData windmillTimerData =
WindmillTagEncodingV1.instance()
.windmillTimerToTimerData(
- prefix,
WindmillTagEncodingV1.instance()
.buildWindmillTimerFromTimerData(
- stateFamily, prefix, timer,
Timer.newBuilder())
+ stateFamily, windmillTimerType, timer,
Timer.newBuilder())
.build(),
coder,
false);
+ TimerData timerData = windmillTimerData.getTimerData();
+ assertEquals(windmillTimerType,
windmillTimerData.getWindmillTimerType());
// The function itself bounds output, so we dont expect the
original input as the
// output, we expect it to be bounded
TimerData expected =
@@ -177,7 +178,7 @@ public class WindmillTagEncodingV1Test {
timer.getDomain(),
CausedByDrain.NORMAL);
- assertThat(computed, equalTo(expected));
+ assertThat(timerData, equalTo(expected));
}
for (String timerId : TEST_TIMER_IDS) {
@@ -221,17 +222,17 @@ public class WindmillTagEncodingV1Test {
timer.getTimestamp(),
expectedTimestamp,
timer.getDomain());
- assertThat(
+ WindmillTimerData windmillTimerData =
WindmillTagEncodingV1.instance()
.windmillTimerToTimerData(
- prefix,
WindmillTagEncodingV1.instance()
.buildWindmillTimerFromTimerData(
- stateFamily, prefix, timer,
Timer.newBuilder())
+ stateFamily, windmillTimerType, timer,
Timer.newBuilder())
.build(),
coder,
- false),
- equalTo(expected));
+ false);
+ assertEquals(windmillTimerType,
windmillTimerData.getWindmillTimerType());
+ assertThat(windmillTimerData.getTimerData(),
equalTo(expected));
}
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
index 1284c46a99a..fbbcfed7cf0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
@@ -35,8 +35,8 @@ import
org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.WindmillTimerType;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.InstantCoder;
@@ -222,7 +222,7 @@ public class WindmillTagEncodingV2Test {
@Parameters(
name =
- "{index}: namespace={0} prefix={1} expectedBytes={2}
includeTimerId={3}"
+ "{index}: namespace={0} windmillTimerType={1} expectedBytes={2}
includeTimerId={3}"
+ " includeTimerFamilyId={4} timeDomain={4}")
public static Collection<Object[]> data() {
List<Object[]> data = new ArrayList<>();
@@ -235,52 +235,52 @@ public class WindmillTagEncodingV2Test {
ImmutableList.of(
new Object[] {
GLOBAL_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
},
new Object[] {
GLOBAL_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
},
new Object[] {
INTERVAL_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
},
new Object[] {
INTERVAL_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
},
new Object[] {
OTHER_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
},
new Object[] {
OTHER_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
},
new Object[] {
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
},
new Object[] {
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
},
new Object[] {
OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
},
new Object[] {
OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
});
@@ -298,7 +298,7 @@ public class WindmillTagEncodingV2Test {
public StateNamespace namespace;
@Parameter(1)
- public WindmillNamespacePrefix prefix;
+ public WindmillTimerType windmillTimerType;
@Parameter(2)
public ByteString expectedBytes;
@@ -327,74 +327,75 @@ public class WindmillTagEncodingV2Test {
new Instant(456),
timeDomain,
CausedByDrain.NORMAL);
- assertEquals(expectedBytes,
WindmillTagEncodingV2.instance().timerTag(prefix, timerData));
+ assertEquals(
+ expectedBytes,
WindmillTagEncodingV2.instance().timerTag(windmillTimerType, timerData));
}
}
@RunWith(Parameterized.class)
public static class TimerDataFromTimerTest {
- @Parameters(name = "{index}: namespace={0} prefix={1} draining={4}
timeDomain={5}")
+ @Parameters(name = "{index}: namespace={0} windmillTimerType={1}
draining={4} timeDomain={5}")
public static Collection<Object[]> data() {
List<Object[]> tests =
ImmutableList.of(
new Object[] {
GLOBAL_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
GlobalWindow.Coder.INSTANCE
},
new Object[] {
GLOBAL_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
GlobalWindow.Coder.INSTANCE
},
new Object[] {
INTERVAL_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
IntervalWindow.getCoder()
},
new Object[] {
INTERVAL_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
IntervalWindow.getCoder()
},
new Object[] {
OTHER_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
new CustomWindow.CustomWindowCoder()
},
new Object[] {
OTHER_WINDOW_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
new CustomWindow.CustomWindowCoder()
},
new Object[] {
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
IntervalWindow.getCoder()
},
new Object[] {
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
IntervalWindow.getCoder()
},
new Object[] {
OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ WindmillTimerType.USER_TIMER,
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
new CustomWindow.CustomWindowCoder()
},
new Object[] {
OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ WindmillTimerType.SYSTEM_TIMER,
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
new CustomWindow.CustomWindowCoder()
});
@@ -415,7 +416,7 @@ public class WindmillTagEncodingV2Test {
public StateNamespace namespace;
@Parameter(1)
- public WindmillNamespacePrefix prefix;
+ public WindmillTimerType windmillTimerType;
@Parameter(2)
public ByteString timerTag;
@@ -444,8 +445,10 @@ public class WindmillTagEncodingV2Test {
.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp))
.setType(timerType(timeDomain))
.build();
- assertEquals(
- timerData, encoding.windmillTimerToTimerData(prefix, timer,
windowCoder, draining));
+ WindmillTimerData windmillTimerData =
+ encoding.windmillTimerToTimerData(timer, windowCoder, draining);
+ assertEquals(windmillTimerType,
windmillTimerData.getWindmillTimerType());
+ assertEquals(timerData, windmillTimerData.getTimerData());
}
}
@@ -467,7 +470,7 @@ public class WindmillTagEncodingV2Test {
ByteString timerTag = ByteString.copyFrom(bytes);
assertEquals(
WindmillTagEncodingV2.instance()
- .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timerData, timerTag),
+ .timerHoldTag(WindmillTimerType.SYSTEM_TIMER, timerData,
timerTag),
timerTag);
}
}