This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bfff80b6fa4 [Dataflow Streaming] Add WindmillTagEncodingV2. (#37151)
bfff80b6fa4 is described below
commit bfff80b6fa49260366d11576c4f4269759fda32e
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Jan 8 01:51:07 2026 -0800
[Dataflow Streaming] Add WindmillTagEncodingV2. (#37151)
This adds a new way of encoding windmill state tags and timer tags.
The new code is behind an unstable experiment. More work is needed
before it can be used on real workloads.
---
.../apache/beam/runners/core/StateNamespaces.java | 8 +
.../org/apache/beam/runners/core/StateTags.java | 17 +
.../worker/StreamingModeExecutionContext.java | 12 +-
.../dataflow/worker/WindmillTimerInternals.java | 6 +-
.../worker/windmill/state/WindmillTagEncoding.java | 5 +-
.../windmill/state/WindmillTagEncodingV1.java | 3 +-
.../windmill/state/WindmillTagEncodingV2.java | 406 +++++++++++++++
.../processing/ComputationWorkExecutorFactory.java | 14 +-
.../worker/StreamingModeExecutionContextTest.java | 3 +-
.../dataflow/worker/WorkerCustomSourcesTest.java | 6 +-
.../windmill/state/WindmillTagEncodingV2Test.java | 576 +++++++++++++++++++++
11 files changed, 1044 insertions(+), 12 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
index a68ab6c913c..e919d12eaac 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateNamespaces.java
@@ -102,6 +102,10 @@ public class StateNamespaces {
return window;
}
+ public Coder<W> getWindowCoder() {
+ return windowCoder;
+ }
+
@Override
public String stringKey() {
try {
@@ -170,6 +174,10 @@ public class StateNamespaces {
return window;
}
+ public Coder<W> getWindowCoder() {
+ return windowCoder;
+ }
+
public int getTriggerIndex() {
return triggerIndex;
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
index ba5478be6c7..5d69abe8ffc 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -144,6 +144,8 @@ public class StateTags {
private interface SystemStateTag<StateT extends State> {
StateTag<StateT> asKind(StateKind kind);
+
+ StateKind getKind();
}
/** Create a state tag for the given id and spec. */
@@ -243,6 +245,16 @@ public class StateTags {
return typedTag.asKind(StateKind.SYSTEM);
}
+ /*
+ * Returns true if the tag is a system internal tag.
+ */
+ public static <StateT extends State> boolean
isSystemTagInternal(StateTag<StateT> tag) {
+ if (!(tag instanceof SystemStateTag)) {
+ return false;
+ }
+ return StateKind.SYSTEM.equals(((SystemStateTag<?>) tag).getKind());
+ }
+
public static <InputT, AccumT, OutputT> StateTag<BagState<AccumT>>
convertToBagTagInternal(
StateTag<CombiningState<InputT, AccumT, OutputT>> combiningTag) {
return new SimpleStateTag<>(
@@ -358,6 +370,11 @@ public class StateTags {
return new SimpleStateTag<>(id.asKind(kind), spec);
}
+ @Override
+ public StateKind getKind() {
+ return id.kind;
+ }
+
@Override
public boolean equals(@Nullable Object other) {
if (!(other instanceof SimpleStateTag)) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index ec996c9ab02..09afcadc300 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -59,10 +59,12 @@ import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache.ForComputation;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateReader;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncoding;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV1;
+import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillTagEncodingV2;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
@@ -154,13 +156,14 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
String computationId,
ReaderCache readerCache,
Map<String, String> stateNameMap,
- WindmillStateCache.ForComputation stateCache,
+ ForComputation stateCache,
MetricsContainerRegistry<StreamingStepMetricsContainer>
metricsContainerRegistry,
DataflowExecutionStateTracker executionStateTracker,
StreamingModeExecutionStateRegistry executionStateRegistry,
StreamingGlobalConfigHandle globalConfigHandle,
long sinkByteLimit,
- boolean throwExceptionOnLargeOutput) {
+ boolean throwExceptionOnLargeOutput,
+ boolean enableWindmillTagEncodingV2) {
super(
counterFactory,
metricsContainerRegistry,
@@ -171,7 +174,10 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
this.readerCache = readerCache;
this.globalConfigHandle = globalConfigHandle;
this.sideInputCache = new HashMap<>();
- this.windmillTagEncoding = WindmillTagEncodingV1.instance();
+ this.windmillTagEncoding =
+ enableWindmillTagEncodingV2
+ ? WindmillTagEncodingV2.instance()
+ : WindmillTagEncodingV1.instance();
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index cb41aa1ccab..4287188c35b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -201,7 +201,7 @@ class WindmillTimerInternals implements TimerInternals {
// Setting a timer, clear any prior hold and set to the new value
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+ .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(
@@ -210,7 +210,7 @@ class WindmillTimerInternals implements TimerInternals {
// Clear the hold in case a previous iteration of this timer set
one.
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+ .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
@@ -225,7 +225,7 @@ class WindmillTimerInternals implements TimerInternals {
// We are deleting timer; clear the hold
outputBuilder
.addWatermarkHoldsBuilder()
- .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData))
+ .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData,
timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
index 59841f67347..a979a1d982c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncoding.java
@@ -54,8 +54,11 @@ public abstract class WindmillTagEncoding {
/**
* Produce a state tag that is guaranteed to be unique for the given timer,
to add a watermark
* hold that is only freed after the timer fires.
+ *
+ * @param timerTag tag of the timer that maps to the hold.
*/
- public abstract ByteString timerHoldTag(WindmillNamespacePrefix prefix,
TimerData timerData);
+ public abstract ByteString timerHoldTag(
+ WindmillNamespacePrefix prefix, TimerData timerData, ByteString
timerTag);
/**
* Produce a tag that is guaranteed to be unique for the given prefix,
namespace, domain and
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
index 19e31351a52..14c3f8c0179 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV1.java
@@ -70,7 +70,8 @@ public class WindmillTagEncodingV1 extends
WindmillTagEncoding {
/** {@inheritDoc} */
@Override
- public ByteString timerHoldTag(WindmillNamespacePrefix prefix, TimerData
timerData) {
+ public ByteString timerHoldTag(
+ WindmillNamespacePrefix prefix, TimerData timerData, ByteString
unusedTimerTag) {
String tagString;
if ("".equals(timerData.getTimerFamilyId())) {
tagString =
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
new file mode 100644
index 00000000000..0702c375282
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowAndTriggerNamespace;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream;
+import
org.apache.beam.runners.dataflow.worker.util.ThreadLocalByteStringOutputStream.StreamHandle;
+import
org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import
org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import org.joda.time.Instant;
+
+/**
+ * Encodes and decodes StateTags and TimerTags from and to windmill bytes.
This encoding scheme
+ * enforces a specific lexicographical order on state tags. The ordering
enables building range
+ * filters using the tags.
+ *
+ * <h2>1. High-Level Tag Formats</h2>
+ *
+ * <p>State tags and Timer tags differ in structure but share common component
encodings.
+ *
+ * <h3>1.1 State Tag Encoding</h3>
+ *
+ * <p>Used for generic state variables (e.g., ValueState, BagState, etc).
+ *
+ * <pre>
+ * Format:
+ * | Encoded Namespace | Encoded Address |
+ * </pre>
+ *
+ * <ul>
+ * <li><b>Encoded Namespace:</b> Encodes the state namespace (see Section
2.1).
+ * <li><b>Encoded Address:</b> Encodes the state variable address (see
Section 2.3).
+ * </ul>
+ *
+ * <h3>1.2 Timer/Timer Hold Tag Encoding</h3>
+ *
+ * <p>Specialized tags, used for timers and automatic watermark holds
associated with the timers.
+ *
+ * <pre>
+ * Format:
+ * | Encoded Namespace | Tag Type | Timer Family Id | Timer Id |
+ *
+ *
+-------------------+-----------------------------------------------------------+
+ * | Field | Format
|
+ *
+-------------------+-----------------------------------------------------------+
+ * | Encoded Namespace | Encoded namespace (see Section 2.1).
|
+ *
+-------------------+-----------------------------------------------------------+
+ * | Tag Type | {@code 0x03} (Single byte): System Timer/Watermark
Hold |
+ * | | {@code 0x04} (Single byte): User Timer/Watermark Hold
|
+ *
+-------------------+-----------------------------------------------------------+
+ * | Timer Family ID | TimerFamilyId encoded via length prefixed
|
+ * | | {@code StringUtf8Coder}.
|
+ *
+-------------------+-----------------------------------------------------------+
+ * | Timer ID | TimerId encoded via length prefixed
|
+ * | | {@code StringUtf8Coder}.
|
+ *
+-------------------+-----------------------------------------------------------+
+ * </pre>
+ *
+ * <h2>2. Component Encodings</h2>
+ *
+ * <h3>2.1 Namespace Encoding</h3>
+ *
+ * <p>Namespaces are prefixed with a byte ID to control sorting order.
+ *
+ * <pre>
+ *
+---------------------------+-------------------------------------------------------------+
+ * | Namespace Type | Format
|
+ *
+---------------------------+-------------------------------------------------------------+
+ * | GlobalNamespace | | {@code 0x01} |
|
+ * | | (Single byte)
|
+ *
+---------------------------+-------------------------------------------------------------+
+ * | WindowNamespace | | {@code 0x10} | Encoded Window | {@code
0x01} | |
+ * | | (See Section 2.2)
|
+ *
+---------------------------+-------------------------------------------------------------+
+ * | WindowAndTriggerNamespace | | {@code 0x10} | Encoded Window | {@code
0x02} | TriggerIndex |
+ * | | (See Section 2.2 for Encoded Window)
|
+ * | | TriggerIndex is encoded by {@code
BigEndianIntegerCoder} |
+ *
+---------------------------+-------------------------------------------------------------+
+ * </pre>
+ *
+ * <h3>2.2 Window Encoding</h3>
+ *
+ * <h4>2.2.1 IntervalWindow</h4>
+ *
+ * <p>IntervalWindows use a custom encoding that is different from the
IntervalWindowCoder.
+ *
+ * <pre>
+ * Format:
+ * | 0x64 | End Time | Start Time |
+ * </pre>
+ *
+ * <ul>
+ * <li><b>Prefix:</b> {@code 0x64}. Single byte identifying Interval windows.
+ * <li><b>End Time:</b> {@code intervalWindow.end()} encoded via {@code
InstantCoder}.
+ * <li><b>Start Time:</b> {@code intervalWindow.start()} encoded via {@code
InstantCoder}.
+ * </ul>
+ *
+ * <p><b>Note:</b> {@code InstantCoder} preserves the sort order. The encoded
IntervalWindow is to
+ * be sorted based on {@code [End Time, Start Time]} directly without needing
to decode.
+ *
+ * <h4>2.2.2 Other Windows</h4>
+ *
+ * <p>All non-IntervalWindows use the standard window coders.
+ *
+ * <pre>
+ * Format:
+ * | 0x02 | Window |
+ * </pre>
+ *
+ * <ul>
+ * <li><b>Prefix:</b> {@code 0x02}. Single byte identifying non-Interval
windows.
+ * <li><b>Window:</b> The window serialized using its {@code windowCoder}.
+ * </ul>
+ *
+ * <h3>2.3 Address Encoding</h3>
+ *
+ * <p>Combines the state type and the state identifier.
+ *
+ * <pre>
+ * Format:
+ * | State Type | Address |
+ *
+ *
+------------+-----------------------------------------------------------------+
+ * | Field | Format
|
+ *
+------------+-----------------------------------------------------------------+
+ * | State Type | {@code 0x01} (Single byte): System State
|
+ * | | {@code 0x02} (Single byte): User State
|
+ *
+------------+-----------------------------------------------------------------+
+ * | Address | The state address (string) is encoded via length prefixed
|
+ * | | {@code StringUtf8Coder}.
|
+ *
+------------+-----------------------------------------------------------------+
+ * </pre>
+ *
+ * <h2>3. Tag Ordering</h2>
+ *
+ * <p>The encoding prefixes are chosen to enforce the following
lexicographical sort order (lowest
+ * to highest):
+ *
+ * <ol>
+ * <li><b>Tags in Global Namespace</b> (Prefix {@code 0x01})
+ * <li><b>Tags in Non-Interval Windows</b> (Prefix {@code 0x1002})
+ * <li><b>Tags in Interval Windows</b> (Prefix {@code 0x1064})
+ * <ul>
+ * <li>Sorted internally by {@code [EndTime, StartTime]}.
+ * </ul>
+ * </ol>
+ */
+@Internal
+@ThreadSafe
+public class WindmillTagEncodingV2 extends WindmillTagEncoding {
+
+ private static final WindmillTagEncodingV2 INSTANCE = new
WindmillTagEncodingV2();
+ private static final int WINDOW_NAMESPACE_BYTE = 0x01;
+ private static final int WINDOW_AND_TRIGGER_NAMESPACE_BYTE = 0x02;
+ private static final int NON_GLOBAL_NAMESPACE_BYTE = 0x10;
+ private static final int GLOBAL_NAMESPACE_BYTE = 0x01;
+ private static final int SYSTEM_STATE_TAG_BYTE = 0x01;
+ private static final int USER_STATE_TAG_BYTE = 0x02;
+ private static final int SYSTEM_TIMER_BYTE = 0x03;
+ private static final int USER_TIMER_BYTE = 0x04;
+ private static final int INTERVAL_WINDOW_BYTE = 0x64;
+ private static final int OTHER_WINDOW_BYTE = 0x02;
+
+ // Private constructor to prevent instantiations from outside.
+ private WindmillTagEncodingV2() {}
+
+ /** {@inheritDoc} */
+ @Override
+ public InternedByteString stateTag(StateNamespace namespace, StateTag<?>
address) {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ encodeNamespace(namespace, stream);
+ encodeAddress(address, stream);
+ return InternedByteString.of(stream.toByteStringAndReset());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ByteString timerHoldTag(
+ WindmillNamespacePrefix prefix, TimerData timerData, ByteString
timerTag) {
+ // Same encoding for timer tag and timer hold tag.
+ // They are put in different places and won't collide.
+ return timerTag;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ByteString timerTag(WindmillNamespacePrefix prefix, TimerData
timerData) {
+ try (StreamHandle streamHandle =
ThreadLocalByteStringOutputStream.acquire()) {
+ ByteStringOutputStream stream = streamHandle.stream();
+ encodeNamespace(timerData.getNamespace(), stream);
+ if (WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix)) {
+ stream.write(SYSTEM_TIMER_BYTE);
+ } else if (WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix))
{
+ stream.write(USER_TIMER_BYTE);
+ } else {
+ throw new IllegalStateException("Unexpected WindmillNamespacePrefix" +
prefix);
+ }
+ StringUtf8Coder.of().encode(timerData.getTimerFamilyId(), stream);
+ StringUtf8Coder.of().encode(timerData.getTimerId(), stream);
+ return stream.toByteStringAndReset();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public TimerData windmillTimerToTimerData(
+ WindmillNamespacePrefix prefix,
+ Timer timer,
+ Coder<? extends BoundedWindow> windowCoder,
+ boolean draining) {
+
+ InputStream stream = timer.getTag().newInput();
+
+ try {
+ StateNamespace stateNamespace = decodeNamespace(stream, windowCoder);
+ int nextByte = stream.read();
+ if (nextByte == SYSTEM_TIMER_BYTE) {
+
checkState(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX.equals(prefix));
+ } else if (nextByte == USER_TIMER_BYTE) {
+
checkState(WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix));
+ } else {
+ throw new IllegalStateException("Unexpected timer tag byte: " +
nextByte);
+ }
+
+ String timerFamilyId = StringUtf8Coder.of().decode(stream);
+ String timerId = StringUtf8Coder.of().decode(stream);
+
+ Instant timestamp =
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getTimestamp());
+ Instant outputTimestamp = timestamp;
+ if (timer.hasMetadataTimestamp()) {
+ // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output
timestamp" so make sure
+ // to change the upper bound.
+ outputTimestamp =
+
WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+ if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+ outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+ }
+ }
+
+ return TimerData.of(
+ timerId,
+ timerFamilyId,
+ stateNamespace,
+ timestamp,
+ outputTimestamp,
+ timerTypeToTimeDomain(timer.getType()));
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ // todo add draining (https://github.com/apache/beam/issues/36884)
+ }
+
+ /** @return the singleton WindmillStateTagUtil */
+ public static WindmillTagEncodingV2 instance() {
+ return INSTANCE;
+ }
+
+ private void encodeAddress(StateTag<?> tag, ByteStringOutputStream stream)
throws IOException {
+ if (StateTags.isSystemTagInternal(tag)) {
+ stream.write(SYSTEM_STATE_TAG_BYTE);
+ } else {
+ stream.write(USER_STATE_TAG_BYTE);
+ }
+ StringUtf8Coder.of().encode(tag.getId(), stream);
+ }
+
+ private void encodeNamespace(StateNamespace namespace,
ByteStringOutputStream stream)
+ throws IOException {
+ if (namespace instanceof GlobalNamespace) {
+ stream.write(GLOBAL_NAMESPACE_BYTE);
+ } else if (namespace instanceof WindowNamespace) {
+ stream.write(NON_GLOBAL_NAMESPACE_BYTE);
+ encodeWindowNamespace((WindowNamespace<? extends BoundedWindow>)
namespace, stream);
+ } else if (namespace instanceof WindowAndTriggerNamespace) {
+ stream.write(NON_GLOBAL_NAMESPACE_BYTE);
+ encodeWindowAndTriggerNamespace(
+ (WindowAndTriggerNamespace<? extends BoundedWindow>) namespace,
stream);
+ } else {
+ throw new IllegalStateException("Unsupported namespace type: " +
namespace.getClass());
+ }
+ }
+
+ private StateNamespace decodeNamespace(
+ InputStream stream, Coder<? extends BoundedWindow> windowCoder) throws
IOException {
+ int firstByte = stream.read();
+ switch (firstByte) {
+ case GLOBAL_NAMESPACE_BYTE:
+ return StateNamespaces.global();
+ case NON_GLOBAL_NAMESPACE_BYTE:
+ return decodeNonGlobalNamespace(stream, windowCoder);
+ default:
+ throw new IllegalStateException("Invalid first namespace byte: " +
firstByte);
+ }
+ }
+
+ private <W extends BoundedWindow> StateNamespace decodeNonGlobalNamespace(
+ InputStream stream, Coder<W> windowCoder) throws IOException {
+ W window = decodeWindow(stream, windowCoder);
+ int namespaceByte = stream.read();
+ switch (namespaceByte) {
+ case WINDOW_NAMESPACE_BYTE:
+ return StateNamespaces.window(windowCoder, window);
+ case WINDOW_AND_TRIGGER_NAMESPACE_BYTE:
+ Integer triggerIndex = BigEndianIntegerCoder.of().decode(stream);
+ return StateNamespaces.windowAndTrigger(windowCoder, window,
triggerIndex);
+ default:
+ throw new IllegalStateException("Invalid trigger namespace byte: " +
namespaceByte);
+ }
+ }
+
+ private <W extends BoundedWindow> W decodeWindow(InputStream stream,
Coder<W> windowCoder)
+ throws IOException {
+ int firstByte = stream.read();
+ W window;
+ switch (firstByte) {
+ case INTERVAL_WINDOW_BYTE:
+ window = (W) decodeIntervalWindow(stream);
+ break;
+ case OTHER_WINDOW_BYTE:
+ window = windowCoder.decode(stream);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected window first byte: " +
firstByte);
+ }
+ return window;
+ }
+
+ private IntervalWindow decodeIntervalWindow(InputStream stream) throws
IOException {
+ Instant end = InstantCoder.of().decode(stream);
+ Instant start = InstantCoder.of().decode(stream);
+ return new IntervalWindow(start, end);
+ }
+
+ private <W extends BoundedWindow> void encodeWindowNamespace(
+ WindowNamespace<W> windowNamespace, ByteStringOutputStream stream)
throws IOException {
+ encodeWindow(windowNamespace.getWindow(),
windowNamespace.getWindowCoder(), stream);
+ stream.write(WINDOW_NAMESPACE_BYTE);
+ }
+
+ private <W extends BoundedWindow> void encodeWindowAndTriggerNamespace(
+ WindowAndTriggerNamespace<W> windowAndTriggerNamespace,
ByteStringOutputStream stream)
+ throws IOException {
+ encodeWindow(
+ windowAndTriggerNamespace.getWindow(),
windowAndTriggerNamespace.getWindowCoder(), stream);
+ stream.write(WINDOW_AND_TRIGGER_NAMESPACE_BYTE);
+
BigEndianIntegerCoder.of().encode(windowAndTriggerNamespace.getTriggerIndex(),
stream);
+ }
+
+ private <W extends BoundedWindow> void encodeWindow(
+ W window, Coder<W> windowCoder, ByteStringOutputStream stream) throws
IOException {
+ if (windowCoder instanceof IntervalWindowCoder) {
+ stream.write(INTERVAL_WINDOW_BYTE);
+ InstantCoder.of().encode(((IntervalWindow) window).end(), stream);
+ InstantCoder.of().encode(((IntervalWindow) window).start(), stream);
+ } else {
+ stream.write(OTHER_WINDOW_BYTE);
+ windowCoder.encode(window, stream);
+ }
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
index 26979990330..097da87fb01 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java
@@ -74,6 +74,14 @@ final class ComputationWorkExecutorFactory {
private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT =
"throw_exceptions_on_large_output";
+ // Experiment to enable tag encoding v2.
+ // Experiment is for testing by dataflow runner developers.
+ // Related logic could change anytime without notice.
+ // **DO NOT USE** on real workloads.
+ // Enabling the experiment could lead to state incompatibilities and broken
jobs.
+ private static final String UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT =
+ "unstable_windmill_tag_encoding_v2";
+
private final DataflowWorkerHarnessOptions options;
private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory;
private final ReaderCache readerCache;
@@ -97,6 +105,7 @@ final class ComputationWorkExecutorFactory {
private final IdGenerator idGenerator;
private final StreamingGlobalConfigHandle globalConfigHandle;
private final boolean throwExceptionOnLargeOutput;
+ private final boolean enableWindmillTagEncodingV2;
ComputationWorkExecutorFactory(
DataflowWorkerHarnessOptions options,
@@ -124,6 +133,8 @@ final class ComputationWorkExecutorFactory {
: StreamingDataflowWorker.MAX_SINK_BYTES;
this.throwExceptionOnLargeOutput =
hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT);
+ this.enableWindmillTagEncodingV2 =
+ hasExperiment(options, UNSTABLE_WINDMILL_TAG_ENCODING_EXPERIMENT);
}
private static Nodes.ParallelInstructionNode extractReadNode(
@@ -268,7 +279,8 @@ final class ComputationWorkExecutorFactory {
stageInfo.executionStateRegistry(),
globalConfigHandle,
maxSinkBytes,
- throwExceptionOnLargeOutput);
+ throwExceptionOnLargeOutput,
+ enableWindmillTagEncodingV2);
}
private DataflowMapTaskExecutor createMapTaskExecutor(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 93b279f0aec..8372b33d81c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -133,7 +133,8 @@ public class StreamingModeExecutionContextTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false,
+ /*enableWindmillTagEncodingV2=*/ false);
}
private static Work createMockWork(Windmill.WorkItem workItem, Watermarks
watermarks) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 334b9414b26..f7364104f5d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -618,7 +618,8 @@ public class WorkerCustomSourcesTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false,
+ /*enableWindmillTagEncodingV2=*/ false);
options.setNumWorkers(5);
int maxElements = 10;
@@ -989,7 +990,8 @@ public class WorkerCustomSourcesTest {
executionStateRegistry,
globalConfigHandle,
Long.MAX_VALUE,
- /*throwExceptionOnLargeOutput=*/ false);
+ /*throwExceptionOnLargeOutput=*/ false,
+ /*enableWindmillTagEncodingV2=*/ false);
options.setNumWorkers(5);
int maxElements = 100;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
new file mode 100644
index 00000000000..af9ef95410d
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillTagEncodingV2Test.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.GlobalNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.dataflow.worker.WindmillNamespacePrefix;
+import org.apache.beam.runners.dataflow.worker.WindmillTimeUtils;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class WindmillTagEncodingV2Test {
+
+ private static final IntervalWindow INTERVAL_WINDOW =
+ new IntervalWindow(new Instant(10), new Instant(20));
+
+ private static final CustomWindow CUSTOM_WINDOW = new
CustomWindow(INTERVAL_WINDOW);
+
+ private static final int TRIGGER_INDEX = 5;
+
+ private static final StateNamespace GLOBAL_NAMESPACE = new GlobalNamespace();
+
+ private static final StateNamespace INTERVAL_WINDOW_NAMESPACE =
+ StateNamespaces.window(IntervalWindow.getCoder(), INTERVAL_WINDOW);
+ private static final StateNamespace INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE =
+ StateNamespaces.windowAndTrigger(IntervalWindow.getCoder(),
INTERVAL_WINDOW, TRIGGER_INDEX);
+
+ private static final StateNamespace OTHER_WINDOW_NAMESPACE =
+ StateNamespaces.window(new CustomWindow.CustomWindowCoder(),
CUSTOM_WINDOW);
+ private static final StateNamespace OTHER_WINDOW_AND_TRIGGER_NAMESPACE =
+ StateNamespaces.windowAndTrigger(
+ new CustomWindow.CustomWindowCoder(), CUSTOM_WINDOW, TRIGGER_INDEX);
+
+ // Generate a tag with length > 256, so length is encoded in two bytes.
+ private static final String TAG =
+ IntStream.of(300).mapToObj(i -> "a").collect(Collectors.joining());
+
+ private static final StateTag<ValueState<Integer>> USER_STATE_TAG =
+ StateTags.value(TAG, VarIntCoder.of());
+ private static final StateTag<ValueState<Integer>> SYSTEM_STATE_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value(TAG, VarIntCoder.of()));
+
+ private static final ByteString TAG_BYTES = encode(StringUtf8Coder.of(),
TAG);
+
+ private static final ByteString SYSTEM_STATE_TAG_BYTES =
+ ByteString.copyFrom(new byte[] {1}) // system tag
+ .concat(TAG_BYTES);
+ private static final ByteString USER_STATE_TAG_BYTES =
+ ByteString.copyFrom(new byte[] {2}) // user tag
+ .concat(TAG_BYTES);
+
+ private static final ByteString GLOBAL_NAMESPACE_BYTES =
+ ByteString.copyFrom(new byte[] {0x1}); // global namespace
+
+ private static final ByteString INTERVAL_WINDOW_BYTES =
+ ByteString.EMPTY
+ .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.end()))
+ .concat(encode(InstantCoder.of(), INTERVAL_WINDOW.start()));
+
+ private static final ByteString INTERVAL_WINDOW_NAMESPACE_BYTES =
+ ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+ .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window
+ .concat(INTERVAL_WINDOW_BYTES)
+ .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace
+
+ private static final ByteString INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES =
+ ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+ .concat(ByteString.copyFrom(new byte[] {0x64})) // interval window
+ .concat(INTERVAL_WINDOW_BYTES)
+ .concat(ByteString.copyFrom(new byte[] {0x02})) // window and
trigger namespace
+ .concat(
+ ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); //
big endian trigger index
+
+ private static final ByteString OTHER_WINDOW_NAMESPACE_BYTES =
+ ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+ .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval
window
+ .concat(encode(new CustomWindow.CustomWindowCoder(), new
CustomWindow(INTERVAL_WINDOW)))
+ .concat(ByteString.copyFrom(new byte[] {0x01})); // window namespace
+
+ private static final ByteString OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES =
+ ByteString.copyFrom(new byte[] {0x10}) // non global namespace
+ .concat(ByteString.copyFrom(new byte[] {0x02})) // non interval
window
+ .concat(encode(new CustomWindow.CustomWindowCoder(), new
CustomWindow(INTERVAL_WINDOW)))
+ .concat(ByteString.copyFrom(new byte[] {0x02})) // window and
trigger namespace
+ .concat(
+ ByteString.copyFrom(new byte[] {0x00, 0x00, 0x00, 0x05})); //
big endian trigger index
+
+ private static final String TIMER_FAMILY_ID = "timerFamily";
+ private static final ByteString TIMER_FAMILY_ID_BYTES =
+ encode(StringUtf8Coder.of(), TIMER_FAMILY_ID);
+
+ private static final String TIMER_ID = "timerId";
+ private static final ByteString TIMER_ID_BYTES =
encode(StringUtf8Coder.of(), TIMER_ID);
+
+ private static final ByteString SYSTEM_TIMER_BYTES =
+ ByteString.copyFrom(new byte[] {0x3}) // system timer
+ .concat(TIMER_FAMILY_ID_BYTES)
+ .concat(TIMER_ID_BYTES);
+
+ private static final ByteString USER_TIMER_BYTES =
+ ByteString.copyFrom(new byte[] {0x4}) // user timer
+ .concat(TIMER_FAMILY_ID_BYTES)
+ .concat(TIMER_ID_BYTES);
+
+ private static final ByteString SYSTEM_TIMER_BYTES_NO_FAMILY_ID =
+ ByteString.copyFrom(new byte[] {0x3}) // system timer
+ .concat(encode(StringUtf8Coder.of(), ""))
+ .concat(TIMER_ID_BYTES);
+
+ private static final ByteString USER_TIMER_BYTES_NO_FAMILY_ID =
+ ByteString.copyFrom(new byte[] {0x4}) // user timer
+ .concat(encode(StringUtf8Coder.of(), ""))
+ .concat(TIMER_ID_BYTES);
+
+ @RunWith(Parameterized.class)
+ public static class EncodeStateTagTest {
+
+ @Parameters(name = "{index}: namespace={0} stateTag={1} expectedBytes={2}")
+ public static Collection<Object[]> data() {
+ return ImmutableList.of(
+ new Object[] {
+ GLOBAL_NAMESPACE, USER_STATE_TAG,
GLOBAL_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+ },
+ new Object[] {
+ GLOBAL_NAMESPACE,
+ SYSTEM_STATE_TAG,
+ GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_STATE_TAG_BYTES)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_NAMESPACE,
+ USER_STATE_TAG,
+ INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+ USER_STATE_TAG,
+
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+ },
+ new Object[] {
+ OTHER_WINDOW_NAMESPACE,
+ USER_STATE_TAG,
+ OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+ },
+ new Object[] {
+ OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+ USER_STATE_TAG,
+
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_STATE_TAG_BYTES)
+ });
+ }
+
+ @Parameter(0)
+ public StateNamespace namespace;
+
+ @Parameter(1)
+ public StateTag<?> stateTag;
+
+ @Parameter(2)
+ public ByteString expectedBytes;
+
+ @Test
+ public void testStateTag() {
+ assertEquals(
+ expectedBytes,
+ WindmillTagEncodingV2.instance().stateTag(namespace,
stateTag).byteString());
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class TimerTagTest {
+
+ @Parameters(
+ name =
+ "{index}: namespace={0} prefix={1} expectedBytes={2}
includeTimerId={3}"
+ + " includeTimerFamilyId={4} timeDomain={4}")
+ public static Collection<Object[]> data() {
+ List<Object[]> data = new ArrayList<>();
+ for (boolean includeTimerFamilyId : ImmutableList.of(true, false)) {
+ ByteString expectedSystemTimerBytes =
+ includeTimerFamilyId ? SYSTEM_TIMER_BYTES :
SYSTEM_TIMER_BYTES_NO_FAMILY_ID;
+ ByteString expectedUserTimerBytes =
+ includeTimerFamilyId ? USER_TIMER_BYTES :
USER_TIMER_BYTES_NO_FAMILY_ID;
+ List<Object[]> tests =
+ ImmutableList.of(
+ new Object[] {
+ GLOBAL_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ GLOBAL_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+ },
+ new Object[] {
+ GLOBAL_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ GLOBAL_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+ },
+ new Object[] {
+ OTHER_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+ },
+ new Object[] {
+ OTHER_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ OTHER_WINDOW_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+ },
+ new Object[] {
+ INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+ },
+ new Object[] {
+ OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedUserTimerBytes)
+ },
+ new Object[] {
+ OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(expectedSystemTimerBytes)
+ });
+
+ for (Object[] params : tests) {
+ for (TimeDomain timeDomain : TimeDomain.values()) {
+ data.add(
+ new Object[] {params[0], params[1], params[2],
includeTimerFamilyId, timeDomain});
+ }
+ }
+ }
+ return data;
+ }
+
+ @Parameter(0)
+ public StateNamespace namespace;
+
+ @Parameter(1)
+ public WindmillNamespacePrefix prefix;
+
+ @Parameter(2)
+ public ByteString expectedBytes;
+
+ @Parameter(3)
+ public boolean includeTimerFamilyId;
+
+ @Parameter(4)
+ public TimeDomain timeDomain;
+
+ @Test
+ public void testTimerTag() {
+ TimerData timerData =
+ includeTimerFamilyId
+ ? TimerData.of(
+ TIMER_ID,
+ TIMER_FAMILY_ID,
+ namespace,
+ new Instant(123),
+ new Instant(456),
+ timeDomain)
+ : TimerData.of(TIMER_ID, namespace, new Instant(123), new
Instant(456), timeDomain);
+ assertEquals(expectedBytes,
WindmillTagEncodingV2.instance().timerTag(prefix, timerData));
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class TimerDataFromTimerTest {
+
+ @Parameters(name = "{index}: namespace={0} prefix={1} draining={4}
timeDomain={5}")
+ public static Collection<Object[]> data() {
+ List<Object[]> tests =
+ ImmutableList.of(
+ new Object[] {
+ GLOBAL_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ GLOBAL_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+ GlobalWindow.Coder.INSTANCE
+ },
+ new Object[] {
+ GLOBAL_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ GLOBAL_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+ GlobalWindow.Coder.INSTANCE
+ },
+ new Object[] {
+ INTERVAL_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ INTERVAL_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+ IntervalWindow.getCoder()
+ },
+ new Object[] {
+ INTERVAL_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ INTERVAL_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+ IntervalWindow.getCoder()
+ },
+ new Object[] {
+ OTHER_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ OTHER_WINDOW_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+ new CustomWindow.CustomWindowCoder()
+ },
+ new Object[] {
+ OTHER_WINDOW_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ OTHER_WINDOW_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+ new CustomWindow.CustomWindowCoder()
+ },
+ new Object[] {
+ INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+ IntervalWindow.getCoder()
+ },
+ new Object[] {
+ INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+
INTERVAL_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+ IntervalWindow.getCoder()
+ },
+ new Object[] {
+ OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(USER_TIMER_BYTES),
+ new CustomWindow.CustomWindowCoder()
+ },
+ new Object[] {
+ OTHER_WINDOW_AND_TRIGGER_NAMESPACE,
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+
OTHER_WINDOW_AND_TRIGGER_NAMESPACE_BYTES.concat(SYSTEM_TIMER_BYTES),
+ new CustomWindow.CustomWindowCoder()
+ });
+
+ List<Object[]> data = new ArrayList<>();
+ for (Object[] params : tests) {
+ for (boolean draining : ImmutableList.of(true, false)) {
+ for (TimeDomain timeDomain : TimeDomain.values()) {
+ data.add(
+ new Object[] {params[0], params[1], params[2], params[3],
draining, timeDomain});
+ }
+ }
+ }
+ return data;
+ }
+
+ @Parameter(0)
+ public StateNamespace namespace;
+
+ @Parameter(1)
+ public WindmillNamespacePrefix prefix;
+
+ @Parameter(2)
+ public ByteString timerTag;
+
+ @Parameter(3)
+ public Coder<? extends BoundedWindow> windowCoder;
+
+ @Parameter(4)
+ public boolean draining;
+
+ @Parameter(5)
+ public TimeDomain timeDomain;
+
+ @Test
+ public void testTimerDataFromTimer() {
+ WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance();
+ Instant timestamp = Instant.now();
+ Instant outputTimestamp = timestamp.plus(Duration.standardSeconds(1));
+ TimerData timerData =
+ TimerData.of(
+ TIMER_ID, TIMER_FAMILY_ID, namespace, timestamp,
outputTimestamp, timeDomain);
+ Timer timer =
+ Timer.newBuilder()
+ .setTag(timerTag)
+
.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timestamp))
+
.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp))
+ .setType(timerType(timeDomain))
+ .build();
+ assertEquals(
+ timerData, encoding.windmillTimerToTimerData(prefix, timer,
windowCoder, draining));
+ }
+ }
+
+ @RunWith(JUnit4.class)
+ public static class TimerHoldTagTest {
+
+ @Test
+ public void testTimerHoldTagUsesTimerTag() {
+ TimerData timerData =
+ TimerData.of(
+ TIMER_ID,
+ TIMER_FAMILY_ID,
+ GLOBAL_NAMESPACE,
+ new Instant(123),
+ new Instant(456),
+ TimeDomain.EVENT_TIME);
+ byte[] bytes = new byte[16];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ByteString timerTag = ByteString.copyFrom(bytes);
+ assertEquals(
+ WindmillTagEncodingV2.instance()
+ .timerHoldTag(WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timerData, timerTag),
+ timerTag);
+ }
+ }
+
+ @RunWith(JUnit4.class)
+ public static class SortOrderTest {
+
+ @Test
+ public void testSortOrder() {
+ WindmillTagEncodingV2 encoding = WindmillTagEncodingV2.instance();
+
+ Instant baseInstant = Instant.now();
+ // [5, 20)
+ StateNamespace interval5_20 =
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ baseInstant.plus(Duration.millis(5)),
baseInstant.plus(Duration.millis(20))));
+ // [10, 20)
+ StateNamespace interval10_20 =
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ baseInstant.plus(Duration.millis(10)),
baseInstant.plus(Duration.millis(20))));
+ // [20, 30)
+ StateNamespace interval20_30 =
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ baseInstant.plus(Duration.millis(20)),
baseInstant.plus(Duration.millis(30))));
+
+ ByteString globalBytes = encoding.stateTag(GLOBAL_NAMESPACE,
USER_STATE_TAG).byteString();
+ ByteString otherWindowBytes =
+ encoding.stateTag(OTHER_WINDOW_NAMESPACE,
USER_STATE_TAG).byteString();
+ ByteString interval5_20Bytes = encoding.stateTag(interval5_20,
USER_STATE_TAG).byteString();
+ ByteString interval10_20Bytes = encoding.stateTag(interval10_20,
USER_STATE_TAG).byteString();
+ ByteString interval20_30Bytes = encoding.stateTag(interval20_30,
USER_STATE_TAG).byteString();
+
+ // Global < Non-Interval < Interval
+ assertOrdered(globalBytes, otherWindowBytes);
+ assertOrdered(otherWindowBytes, interval5_20Bytes);
+
+ // Interval sorting: EndTime then StartTime
+ // [5, 20) < [10, 20) (Same End=20, Start 5 < 10)
+ assertOrdered(interval5_20Bytes, interval10_20Bytes);
+ // [10, 20) < [20, 30) (End 20 < 30)
+ assertOrdered(interval10_20Bytes, interval20_30Bytes);
+
+ assertTrue(globalBytes.startsWith(ByteString.copyFrom(new byte[]
{0x01})));
+ assertTrue(otherWindowBytes.startsWith(ByteString.copyFrom(new byte[]
{0x10, 0x02})));
+ assertTrue(interval5_20Bytes.startsWith(ByteString.copyFrom(new byte[]
{0x10, 0x64})));
+ assertTrue(interval10_20Bytes.startsWith(ByteString.copyFrom(new byte[]
{0x10, 0x64})));
+ assertTrue(interval20_30Bytes.startsWith(ByteString.copyFrom(new byte[]
{0x10, 0x64})));
+ }
+
+ private void assertOrdered(ByteString smaller, ByteString larger) {
+
assertTrue(ByteString.unsignedLexicographicalComparator().compare(smaller,
larger) < 0);
+ }
+ }
+
+ private static class CustomWindow extends IntervalWindow {
+
+ private CustomWindow(IntervalWindow intervalWindow) {
+ super(intervalWindow.start(), intervalWindow.end());
+ }
+
+ private static class CustomWindowCoder extends Coder<CustomWindow> {
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ IntervalWindowCoder.of().verifyDeterministic();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return IntervalWindowCoder.of().getCoderArguments();
+ }
+
+ @Override
+ public void encode(CustomWindow value, OutputStream outStream) throws
IOException {
+ IntervalWindowCoder.of().encode(value, outStream);
+ }
+
+ @Override
+ public CustomWindow decode(InputStream inStream) throws IOException {
+ return new CustomWindow(IntervalWindowCoder.of().decode(inStream));
+ }
+ }
+ }
+
+ private static <T> ByteString encode(Coder<T> coder, T value) {
+ try {
+ ByteString.Output out = ByteString.newOutput();
+ coder.encode(value, out);
+ return out.toByteString();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static Timer.Type timerType(TimeDomain domain) {
+ switch (domain) {
+ case EVENT_TIME:
+ return Timer.Type.WATERMARK;
+ case PROCESSING_TIME:
+ return Timer.Type.REALTIME;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return Timer.Type.DEPENDENT_REALTIME;
+ default:
+ throw new IllegalArgumentException("Unrecognized TimeDomain: " +
domain);
+ }
+ }
+}