This is an automated email from the ASF dual-hosted git repository.
reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 01e500c Merge pull request #10422: [BEAM-2535] TimerData signature
update
01e500c is described below
commit 01e500c2dd0d699aea0434154b69fd59d824700f
Author: xubii <[email protected]>
AuthorDate: Sat Dec 28 21:21:22 2019 +0500
Merge pull request #10422: [BEAM-2535] TimerData signature update
---
.../operators/ApexGroupByKeyOperator.java | 4 +-
.../translation/operators/ApexParDoOperator.java | 4 +-
.../translation/operators/ApexTimerInternals.java | 16 ++--
.../operators/ApexTimerInternalsTest.java | 8 +-
.../beam/runners/core/construction/Timer.java | 9 +-
.../beam/runners/core/InMemoryTimerInternals.java | 14 +++-
.../beam/runners/core/KeyedWorkItemCoder.java | 4 +-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 2 +-
.../beam/runners/core/StatefulDoFnRunner.java | 7 +-
.../apache/beam/runners/core/TimerInternals.java | 95 +++++++++++++++++++++-
.../runners/core/InMemoryTimerInternalsTest.java | 8 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +
.../beam/runners/core/TimerInternalsTest.java | 8 +-
.../beam/runners/direct/DirectTimerInternals.java | 15 +++-
.../wrappers/streaming/DoFnOperator.java | 19 +++--
.../streaming/ExecutableStageDoFnOperator.java | 2 +
.../worker/StreamingModeExecutionContext.java | 2 +
.../dataflow/worker/StreamingSideInputFetcher.java | 14 +++-
.../dataflow/worker/WindmillTimerInternals.java | 12 ++-
.../dataflow/worker/fn/control/TimerReceiver.java | 3 +-
.../splittabledofn/SDFFeederViaStateAndTimers.java | 7 +-
.../beam/runners/samza/runtime/KeyedInternals.java | 10 ++-
.../beam/runners/samza/runtime/KeyedTimerData.java | 6 +-
.../samza/runtime/SamzaTimerInternalsFactory.java | 10 ++-
.../SparkGroupAlsoByWindowViaWindowSet.java | 6 +-
.../spark/stateful/SparkTimerInternals.java | 11 ++-
26 files changed, 239 insertions(+), 59 deletions(-)
diff --git
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 9a56496..6f90f58 100644
---
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -129,8 +129,8 @@ public class ApexGroupByKeyOperator<K, V>
this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
this.stateInternalsFactory =
stateBackend.newStateInternalsFactory(keyCoder);
- TimerInternals.TimerDataCoder timerCoder =
-
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ TimerInternals.TimerDataCoderV2 timerCoder =
+
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
this.timerInternals = new ApexTimerInternals<>(timerCoder);
}
diff --git
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9d4b110..79bb6ef 100644
---
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -187,8 +187,8 @@ public class ApexParDoOperator<InputT, OutputT> extends
BaseOperator
this.inputCoder = inputCoder;
this.outputCoders = outputCoders;
- TimerInternals.TimerDataCoder timerCoder =
-
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ TimerInternals.TimerDataCoderV2 timerCoder =
+
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
this.doFnSchemaInformation = doFnSchemaInformation;
this.sideInputMapping = sideInputMapping;
diff --git
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index b4028e7..682cbed 100644
---
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -57,7 +57,7 @@ class ApexTimerInternals<K> implements TimerInternals,
Serializable {
private transient Instant currentOutputWatermark;
private transient Coder<K> keyCoder;
- public ApexTimerInternals(TimerDataCoder timerDataCoder) {
+ public ApexTimerInternals(TimerDataCoderV2 timerDataCoder) {
this.eventTimeTimeTimers = new TimerSet(timerDataCoder);
this.processingTimeTimers = new TimerSet(timerDataCoder);
}
@@ -77,8 +77,14 @@ class ApexTimerInternals<K> implements TimerInternals,
Serializable {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ TimerData timerData =
+ TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain);
setTimer(timerData);
}
@@ -196,10 +202,10 @@ class ApexTimerInternals<K> implements TimerInternals,
Serializable {
protected static class TimerSet implements Serializable {
private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
- private final TimerDataCoder timerDataCoder;
+ private final TimerDataCoderV2 timerDataCoder;
private long minTimestamp = Long.MAX_VALUE;
- protected TimerSet(TimerDataCoder timerDataCoder) {
+ protected TimerSet(TimerDataCoderV2 timerDataCoder) {
this.timerDataCoder = timerDataCoder;
}
diff --git
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 0bd890a..1d7e3f8 100644
---
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -29,7 +29,7 @@ import java.util.Set;
import
org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -46,7 +46,7 @@ public class ApexTimerInternalsTest {
final Map<String, Collection<TimerData>> firedTimers = new HashMap<>();
TimerProcessor<String> timerProcessor = firedTimers::put;
- TimerDataCoder timerDataCoder =
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ TimerDataCoderV2 timerDataCoder =
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
String key1 = "key1";
Instant instant0 = new Instant(0);
Instant instant1 = new Instant(1);
@@ -85,7 +85,7 @@ public class ApexTimerInternalsTest {
@Test
public void testDeleteTimer() {
- TimerDataCoder timerDataCoder =
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ TimerDataCoderV2 timerDataCoder =
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
String key1 = "key1";
Instant instant0 = new Instant(0);
Instant instant1 = new Instant(1);
@@ -121,7 +121,7 @@ public class ApexTimerInternalsTest {
@Test
public void testSerialization() {
- TimerDataCoder timerDataCoder =
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+ TimerDataCoderV2 timerDataCoder =
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
TimerData timerData =
TimerData.of(
"arbitrary-id", StateNamespaces.global(), new Instant(0),
TimeDomain.EVENT_TIME);
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 072dbc7..d994762 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -47,9 +47,13 @@ public abstract class Timer<T> {
/** Returns a timer for the given timestamp with a user specified payload. */
public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
- return new AutoValue_Timer(timestamp, payload);
+ return new AutoValue_Timer(timestamp, timestamp, payload);
}
+ /** Returns a timer for the given timestamp with a user specified payload
and outputTimestamp. */
+ public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp,
@Nullable T payload) {
+ return new AutoValue_Timer(timestamp, outputTimestamp, payload);
+ }
/**
* Returns the timestamp of when the timer is scheduled to fire.
*
@@ -58,6 +62,9 @@ public abstract class Timer<T> {
*/
public abstract Instant getTimestamp();
+ /* Returns the outputTimestamps */
+ public abstract Instant getOutputTimestamp();
+
/** A user supplied payload. */
@Nullable
public abstract T getPayload();
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7b01c04..286e60b 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -103,11 +103,19 @@ public class InMemoryTimerInternals implements
TimerInternals {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ setTimer(TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain));
}
- /** @deprecated use {@link #setTimer(StateNamespace, String, Instant,
TimeDomain)}. */
+ /**
+ * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant,
Instant,
+ * TimeDomain)}.
+ */
@Deprecated
@Override
public void setTimer(TimerData timerData) {
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 2949548..fc395de 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -54,7 +54,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends
StructuredCoder<KeyedWorkItem<
this.keyCoder = keyCoder;
this.elemCoder = elemCoder;
this.windowCoder = windowCoder;
- this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
+ this.timersCoder = IterableCoder.of(TimerDataCoderV2.of(windowCoder));
this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder,
windowCoder));
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2b105fb..472a9d2 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -992,7 +992,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
* user has no way to compute a good choice of time.
*/
private void setUnderlyingTimer(Instant target) {
- timerInternals.setTimer(namespace, timerId, target,
spec.getTimeDomain());
+ timerInternals.setTimer(namespace, timerId, "", target, target,
spec.getTimeDomain());
}
private Instant getCurrentTime() {
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 14a9502..f69c74a 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -208,7 +208,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
// make sure this fires after any window.maxTimestamp() timers
gcTime = gcTime.plus(GC_DELAY_MS);
timerInternals.setTimer(
- StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime,
TimeDomain.EVENT_TIME);
+ StateNamespaces.window(windowCoder, window),
+ GC_TIMER_ID,
+ "",
+ gcTime,
+ window.maxTimestamp(),
+ TimeDomain.EVENT_TIME);
}
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index a766143..f9f23ca 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -54,9 +54,18 @@ public interface TimerInternals {
*
* <p>It is an error to set a timer for two different time domains.
*/
- void setTimer(StateNamespace namespace, String timerId, Instant target,
TimeDomain timeDomain);
+ void setTimer(
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain);
- /** @deprecated use {@link #setTimer(StateNamespace, String, Instant,
TimeDomain)}. */
+ /**
+ * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant,
Instant,
+ * TimeDomain)}.
+ */
@Deprecated
void setTimer(TimerData timerData);
@@ -161,10 +170,19 @@ public interface TimerInternals {
public abstract String getTimerId();
+ public abstract String getTimerFamilyId();
+
public abstract StateNamespace getNamespace();
public abstract Instant getTimestamp();
+ /**
+ * Timestamp the timer assigns to outputted elements from {@link
+ * org.apache.beam.sdk.transforms.DoFn.OnTimer} method. For event time
timers, output watermark
+ * is held at this timestamp until the timer fires.
+ */
+ public abstract Instant getOutputTimestamp();
+
public abstract TimeDomain getDomain();
// When adding a new field, make sure to add it to the compareTo() method.
@@ -174,8 +192,25 @@ public interface TimerInternals {
* generated.
*/
public static TimerData of(
+ String timerId,
+ String timerFamilyId,
+ StateNamespace namespace,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain domain) {
+ return new AutoValue_TimerInternals_TimerData(
+ timerId, timerFamilyId, namespace, timestamp, outputTimestamp,
domain);
+ }
+
+ /**
+ * Construct a {@link TimerData} for the given parameters, where the timer
ID is automatically
+ * generated. Construct a {@link TimerData} for the given parameters
except for {@code
+ * outputTimestamp}. {@code outputTimestamp} is set to timer {@code
timestamp}.
+ */
+ public static TimerData of(
String timerId, StateNamespace namespace, Instant timestamp,
TimeDomain domain) {
- return new AutoValue_TimerInternals_TimerData(timerId, namespace,
timestamp, domain);
+ return new AutoValue_TimerInternals_TimerData(
+ timerId, timerId, namespace, timestamp, timestamp, domain);
}
/**
@@ -207,8 +242,10 @@ public interface TimerInternals {
ComparisonChain chain =
ComparisonChain.start()
.compare(this.getTimestamp(), that.getTimestamp())
+ .compare(this.getOutputTimestamp(), that.getOutputTimestamp())
.compare(this.getDomain(), that.getDomain())
- .compare(this.getTimerId(), that.getTimerId());
+ .compare(this.getTimerId(), that.getTimerId())
+ .compare(this.getTimerFamilyId(), that.getTimerFamilyId());
if (chain.result() == 0 &&
!this.getNamespace().equals(that.getNamespace())) {
// Obtaining the stringKey may be expensive; only do so if required
chain = chain.compare(getNamespace().stringKey(),
that.getNamespace().stringKey());
@@ -218,6 +255,56 @@ public interface TimerInternals {
}
/** A {@link Coder} for {@link TimerData}. */
+ class TimerDataCoderV2 extends StructuredCoder<TimerData> {
+ private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+ private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+ private final Coder<? extends BoundedWindow> windowCoder;
+
+ public static TimerDataCoderV2 of(Coder<? extends BoundedWindow>
windowCoder) {
+ return new TimerDataCoderV2(windowCoder);
+ }
+
+ private TimerDataCoderV2(Coder<? extends BoundedWindow> windowCoder) {
+ this.windowCoder = windowCoder;
+ }
+
+ @Override
+ public void encode(TimerData timer, OutputStream outStream) throws
CoderException, IOException {
+ STRING_CODER.encode(timer.getTimerId(), outStream);
+ STRING_CODER.encode(timer.getTimerFamilyId(), outStream);
+ STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+ INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+ INSTANT_CODER.encode(timer.getOutputTimestamp(), outStream);
+ STRING_CODER.encode(timer.getDomain().name(), outStream);
+ }
+
+ @Override
+ public TimerData decode(InputStream inStream) throws CoderException,
IOException {
+ String timerId = STRING_CODER.decode(inStream);
+ String timerFamilyId = STRING_CODER.decode(inStream);
+ StateNamespace namespace =
+ StateNamespaces.fromString(STRING_CODER.decode(inStream),
windowCoder);
+ Instant timestamp = INSTANT_CODER.decode(inStream);
+ Instant outputTimestamp = INSTANT_CODER.decode(inStream);
+ TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
+ return TimerData.of(timerId, timerFamilyId, namespace, timestamp,
outputTimestamp, domain);
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ verifyDeterministic(this, "window coder must be deterministic",
windowCoder);
+ }
+ }
+
+ /**
+ * A {@link Coder} for {@link TimerData}. To make it encoding and decoding
backward compatible for
+ * DataFlow
+ */
class TimerDataCoder extends StructuredCoder<TimerData> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
private static final InstantCoder INSTANT_CODER = InstantCoder.of();
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 6be35b7..dd10632 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -71,8 +71,8 @@ public class InMemoryTimerInternalsTest {
Instant laterTimestamp = new Instant(42);
underTest.advanceInputWatermark(new Instant(0));
- underTest.setTimer(NS1, ID1, earlyTimestamp, TimeDomain.EVENT_TIME);
- underTest.setTimer(NS1, ID1, laterTimestamp, TimeDomain.EVENT_TIME);
+ underTest.setTimer(NS1, ID1, ID1, earlyTimestamp, earlyTimestamp,
TimeDomain.EVENT_TIME);
+ underTest.setTimer(NS1, ID1, ID1, laterTimestamp, laterTimestamp,
TimeDomain.EVENT_TIME);
underTest.advanceInputWatermark(earlyTimestamp.plus(1L));
assertThat(underTest.removeNextEventTimer(), nullValue());
@@ -86,7 +86,7 @@ public class InMemoryTimerInternalsTest {
public void testDeletionIdempotent() throws Exception {
InMemoryTimerInternals underTest = new InMemoryTimerInternals();
Instant timestamp = new Instant(42);
- underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+ underTest.setTimer(NS1, ID1, ID1, timestamp, timestamp,
TimeDomain.EVENT_TIME);
underTest.deleteTimer(NS1, ID1);
underTest.deleteTimer(NS1, ID1);
}
@@ -97,7 +97,7 @@ public class InMemoryTimerInternalsTest {
Instant timestamp = new Instant(42);
underTest.advanceInputWatermark(new Instant(0));
- underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+ underTest.setTimer(NS1, ID1, ID1, timestamp, timestamp,
TimeDomain.EVENT_TIME);
underTest.deleteTimer(NS1, ID1);
underTest.advanceInputWatermark(new Instant(43));
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index b790314..10972d6 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -156,6 +156,8 @@ public class SimpleDoFnRunnerTest {
.setTimer(
StateNamespaces.window(new GlobalWindows().windowCoder(),
GlobalWindow.INSTANCE),
DoFnWithTimers.TIMER_ID,
+ "",
+ currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
TimeDomain.EVENT_TIME);
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
index d1e5221..ab2978f 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertThat;
import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.CoderProperties;
@@ -40,13 +40,13 @@ public class TimerInternalsTest {
@Test
public void testTimerDataCoder() throws Exception {
CoderProperties.coderDecodeEncodeEqual(
- TimerDataCoder.of(GlobalWindow.Coder.INSTANCE),
+ TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE),
TimerData.of(
"arbitrary-id", StateNamespaces.global(), new Instant(0),
TimeDomain.EVENT_TIME));
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
CoderProperties.coderDecodeEncodeEqual(
- TimerDataCoder.of(windowCoder),
+ TimerDataCoderV2.of(windowCoder),
TimerData.of(
"another-id",
StateNamespaces.window(
@@ -57,7 +57,7 @@ public class TimerInternalsTest {
@Test
public void testCoderIsSerializableWithWellKnownCoderType() {
-
CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
+
CoderProperties.coderSerializable(TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE));
}
@Test
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 8f3ab48..0261bf6 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -47,11 +47,20 @@ class DirectTimerInternals implements TimerInternals {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target,
timeDomain));
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ timerUpdateBuilder.setTimer(
+ TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain));
}
- /** @deprecated use {@link #setTimer(StateNamespace, String, Instant,
TimeDomain)}. */
+ /**
+ * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant,
Instant,
+ * TimeDomain)}.
+ */
@Deprecated
@Override
public void setTimer(TimerData timerData) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 822c15c..8e63679 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -171,7 +171,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
final KeySelector<WindowedValue<InputT>, ?> keySelector;
- private final TimerInternals.TimerDataCoder timerCoder;
+ private final TimerInternals.TimerDataCoderV2 timerCoder;
/** Max number of elements to include in a bundle. */
private final long maxBundleSize;
@@ -244,7 +244,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
this.keySelector = keySelector;
this.timerCoder =
-
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
@@ -1088,11 +1088,20 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ setTimer(
+ TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain));
}
- /** @deprecated use {@link #setTimer(StateNamespace, String, Instant,
TimeDomain)}. */
+ /**
+ * @deprecated use {@link #setTimer(StateNamespace, String, String,
Instant, Instant,
+ * TimeDomain)}.
+ */
@Deprecated
@Override
public void setTimer(TimerData timer) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index ba951b1..891cef8 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -839,7 +839,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
timerInternals.setTimer(
StateNamespaces.window(windowCoder, window),
GC_TIMER_ID,
+ "",
gcTime,
+ window.maxTimestamp(),
TimeDomain.EVENT_TIME);
} finally {
stateBackendLock.unlock();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index a114b6f..76aa8b0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -602,6 +602,8 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
.setTimer(
StateNamespaces.window(windowCoder, window),
timerId,
+ "",
+ cleanupTime,
cleanupTime,
TimeDomain.EVENT_TIME);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index 7c5babf..2c00c99 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
import org.apache.beam.sdk.coders.AtomicCoder;
@@ -60,6 +61,7 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
private final StateTag<BagState<WindowedValue<InputT>>> elementsAddr;
private final StateTag<BagState<TimerData>> timersAddr;
+ private final StateTag<BagState<TimerData>> oldTimersAddr;
private final StateTag<WatermarkHoldState> watermarkHoldingAddr;
private final StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>>
blockedMapAddr;
@@ -85,8 +87,11 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
this.elementsAddr =
StateTags.makeSystemTagInternal(
StateTags.bag("elem", WindowedValue.getFullCoder(inputCoder,
mainWindowCoder)));
- this.timersAddr =
+ this.oldTimersAddr =
StateTags.makeSystemTagInternal(StateTags.bag("timer",
TimerDataCoder.of(mainWindowCoder)));
+ this.timersAddr =
+ StateTags.makeSystemTagInternal(
+ StateTags.bag("timerV2", TimerDataCoderV2.of(mainWindowCoder)));
StateTag<WatermarkHoldState> watermarkTag =
StateTags.watermarkStateInternal(
"holdForSideinput", windowingStrategy.getTimestampCombiner());
@@ -169,6 +174,7 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
List<BagState<TimerData>> timers = Lists.newArrayList();
for (W window : readyWindows) {
timers.add(timerBag(window).readLater());
+ timers.add(timerOldBag(window).readLater());
}
return timers;
}
@@ -275,6 +281,12 @@ public class StreamingSideInputFetcher<InputT, W extends
BoundedWindow> {
.state(StateNamespaces.window(mainWindowCoder, window), timersAddr);
}
+ BagState<TimerData> timerOldBag(W window) {
+ return stepContext
+ .stateInternals()
+ .state(StateNamespaces.window(mainWindowCoder, window), oldTimersAddr);
+ }
+
private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest
buildGlobalDataRequest(
PCollectionView<?> view, BoundedWindow mainWindow) {
@SuppressWarnings("unchecked")
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c2deb2f..b2ba62e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -94,8 +94,16 @@ class WindmillTimerInternals implements TimerInternals {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant timestamp, TimeDomain
timeDomain) {
- timers.put(timerId, namespace, TimerData.of(timerId, namespace, timestamp,
timeDomain));
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ timers.put(
+ timerId,
+ namespace,
+ TimerData.of(timerId, timerFamilyId, namespace, timestamp,
outputTimestamp, timeDomain));
timerStillPresent.put(timerId, namespace, true);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 286ea8a..e3d2277 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -119,7 +119,8 @@ public class TimerReceiver {
String timerId = timerSpec.timerId();
TimerInternals timerInternals =
stepContext.namespacedToUser().timerInternals();
- timerInternals.setTimer(namespace, timerId, timer.getTimestamp(),
timeDomain);
+ timerInternals.setTimer(
+ namespace, timerId, "", timer.getTimestamp(),
timer.getOutputTimestamp(), timeDomain);
timerIdToKey.put(timerId, windowedValue.getValue().getKey());
timerIdToPayload.put(timerId, timer.getPayload());
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
index c659601..920dae6 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
@@ -161,7 +161,12 @@ public class SDFFeederViaStateAndTimers<InputT,
RestrictionT> {
// Set a timer to continue processing this element.
timerInternals.setTimer(
- stateNamespace, "sdfContinuation", wakeupTime,
TimeDomain.PROCESSING_TIME);
+ stateNamespace,
+ "sdfContinuation",
+ "sdfContinuation",
+ wakeupTime,
+ wakeupTime,
+ TimeDomain.PROCESSING_TIME);
}
/** Signals that a split happened. */
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
index d504929..330fb24 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
@@ -118,8 +118,14 @@ class KeyedInternals<K> {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- getInternals().setTimer(namespace, timerId, target, timeDomain);
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ getInternals()
+ .setTimer(namespace, timerId, timerFamilyId, target,
outputTimestamp, timeDomain);
}
@Override
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 2f3b809..a6214be 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -120,9 +120,9 @@ public class KeyedTimerData<K> implements
Comparable<KeyedTimerData<K>> {
}
/**
- * Coder for {@link KeyedTimerData}. Note we don't use the {@link
- * org.apache.beam.runners.core.TimerInternals.TimerDataCoder} here directly
since we want to
- * en/decode timestamp first so the timers will be sorted in the state.
+ * Coder for {@link KeyedTimerData}. Note we don't use the {@link
TimerInternals.TimerDataCoderV2}
+ * here directly since we want to en/decode timestamp first so the timers
will be sorted in the
+ * state.
*/
public static class KeyedTimerDataCoder<K> extends
StructuredCoder<KeyedTimerData<K>> {
private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 676129d..9ac082b 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -184,8 +184,14 @@ public class SamzaTimerInternalsFactory<K> implements
TimerInternalsFactory<K> {
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
- setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ setTimer(
+ TimerData.of(timerId, timerFamilyId, namespace, target,
outputTimestamp, timeDomain));
}
@Override
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index f8ff5e6..b741050 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -368,7 +368,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements
Serializable {
private final FullWindowedValueCoder<InputT> wvCoder;
private final Coder<K> keyCoder;
private final List<Integer> sourceIds;
- private final TimerInternals.TimerDataCoder timerDataCoder;
+ private final TimerInternals.TimerDataCoderV2 timerDataCoder;
private final WindowingStrategy<?, W> windowingStrategy;
private final SerializablePipelineOptions options;
private final IterableCoder<WindowedValue<InputT>> itrWvCoder;
@@ -461,9 +461,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements
Serializable {
return FullWindowedValueCoder.of(KvCoder.of(keyCoder,
IterableCoder.of(iCoder)), wCoder);
}
- private static <W extends BoundedWindow> TimerInternals.TimerDataCoder
timerDataCoderOf(
+ private static <W extends BoundedWindow> TimerInternals.TimerDataCoderV2
timerDataCoderOf(
final WindowingStrategy<?, W> windowingStrategy) {
- return
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+ return
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
}
private static void checkpointIfNeeded(
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index 6cdcef4..1d5b36b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -155,7 +155,12 @@ public class SparkTimerInternals implements TimerInternals
{
@Override
public void setTimer(
- StateNamespace namespace, String timerId, Instant target, TimeDomain
timeDomain) {
+ StateNamespace namespace,
+ String timerId,
+ String timerFamilyId,
+ Instant target,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException("Setting a timer by ID not yet
supported.");
}
@@ -165,12 +170,12 @@ public class SparkTimerInternals implements
TimerInternals {
}
public static Collection<byte[]> serializeTimers(
- Collection<TimerData> timers, TimerDataCoder timerDataCoder) {
+ Collection<TimerData> timers, TimerDataCoderV2 timerDataCoder) {
return CoderHelpers.toByteArrays(timers, timerDataCoder);
}
public static Iterator<TimerData> deserializeTimers(
- Collection<byte[]> serTimers, TimerDataCoder timerDataCoder) {
+ Collection<byte[]> serTimers, TimerDataCoderV2 timerDataCoder) {
return CoderHelpers.fromByteArrays(serTimers, timerDataCoder).iterator();
}