This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 01e500c  Merge pull request #10422: [BEAM-2535] TimerData signature 
update
01e500c is described below

commit 01e500c2dd0d699aea0434154b69fd59d824700f
Author: xubii <[email protected]>
AuthorDate: Sat Dec 28 21:21:22 2019 +0500

    Merge pull request #10422: [BEAM-2535] TimerData signature update
---
 .../operators/ApexGroupByKeyOperator.java          |  4 +-
 .../translation/operators/ApexParDoOperator.java   |  4 +-
 .../translation/operators/ApexTimerInternals.java  | 16 ++--
 .../operators/ApexTimerInternalsTest.java          |  8 +-
 .../beam/runners/core/construction/Timer.java      |  9 +-
 .../beam/runners/core/InMemoryTimerInternals.java  | 14 +++-
 .../beam/runners/core/KeyedWorkItemCoder.java      |  4 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  2 +-
 .../beam/runners/core/StatefulDoFnRunner.java      |  7 +-
 .../apache/beam/runners/core/TimerInternals.java   | 95 +++++++++++++++++++++-
 .../runners/core/InMemoryTimerInternalsTest.java   |  8 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java    |  2 +
 .../beam/runners/core/TimerInternalsTest.java      |  8 +-
 .../beam/runners/direct/DirectTimerInternals.java  | 15 +++-
 .../wrappers/streaming/DoFnOperator.java           | 19 +++--
 .../streaming/ExecutableStageDoFnOperator.java     |  2 +
 .../worker/StreamingModeExecutionContext.java      |  2 +
 .../dataflow/worker/StreamingSideInputFetcher.java | 14 +++-
 .../dataflow/worker/WindmillTimerInternals.java    | 12 ++-
 .../dataflow/worker/fn/control/TimerReceiver.java  |  3 +-
 .../splittabledofn/SDFFeederViaStateAndTimers.java |  7 +-
 .../beam/runners/samza/runtime/KeyedInternals.java | 10 ++-
 .../beam/runners/samza/runtime/KeyedTimerData.java |  6 +-
 .../samza/runtime/SamzaTimerInternalsFactory.java  | 10 ++-
 .../SparkGroupAlsoByWindowViaWindowSet.java        |  6 +-
 .../spark/stateful/SparkTimerInternals.java        | 11 ++-
 26 files changed, 239 insertions(+), 59 deletions(-)

diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 9a56496..6f90f58 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -129,8 +129,8 @@ public class ApexGroupByKeyOperator<K, V>
     this.keyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
     this.valueCoder = ((KvCoder<K, V>) input.getCoder()).getValueCoder();
     this.stateInternalsFactory = 
stateBackend.newStateInternalsFactory(keyCoder);
-    TimerInternals.TimerDataCoder timerCoder =
-        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+    TimerInternals.TimerDataCoderV2 timerCoder =
+        
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
     this.timerInternals = new ApexTimerInternals<>(timerCoder);
   }
 
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 9d4b110..79bb6ef 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -187,8 +187,8 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator
     this.inputCoder = inputCoder;
     this.outputCoders = outputCoders;
 
-    TimerInternals.TimerDataCoder timerCoder =
-        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+    TimerInternals.TimerDataCoderV2 timerCoder =
+        
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
     this.currentKeyTimerInternals = new ApexTimerInternals<>(timerCoder);
     this.doFnSchemaInformation = doFnSchemaInformation;
     this.sideInputMapping = sideInputMapping;
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
index b4028e7..682cbed 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternals.java
@@ -57,7 +57,7 @@ class ApexTimerInternals<K> implements TimerInternals, 
Serializable {
   private transient Instant currentOutputWatermark;
   private transient Coder<K> keyCoder;
 
-  public ApexTimerInternals(TimerDataCoder timerDataCoder) {
+  public ApexTimerInternals(TimerDataCoderV2 timerDataCoder) {
     this.eventTimeTimeTimers = new TimerSet(timerDataCoder);
     this.processingTimeTimers = new TimerSet(timerDataCoder);
   }
@@ -77,8 +77,14 @@ class ApexTimerInternals<K> implements TimerInternals, 
Serializable {
 
   @Override
   public void setTimer(
-      StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-    TimerData timerData = TimerData.of(timerId, namespace, target, timeDomain);
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant target,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    TimerData timerData =
+        TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain);
     setTimer(timerData);
   }
 
@@ -196,10 +202,10 @@ class ApexTimerInternals<K> implements TimerInternals, 
Serializable {
 
   protected static class TimerSet implements Serializable {
     private final Map<Slice, Set<Slice>> activeTimers = new HashMap<>();
-    private final TimerDataCoder timerDataCoder;
+    private final TimerDataCoderV2 timerDataCoder;
     private long minTimestamp = Long.MAX_VALUE;
 
-    protected TimerSet(TimerDataCoder timerDataCoder) {
+    protected TimerSet(TimerDataCoderV2 timerDataCoder) {
       this.timerDataCoder = timerDataCoder;
     }
 
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
index 0bd890a..1d7e3f8 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/operators/ApexTimerInternalsTest.java
@@ -29,7 +29,7 @@ import java.util.Set;
 import 
org.apache.beam.runners.apex.translation.operators.ApexTimerInternals.TimerProcessor;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -46,7 +46,7 @@ public class ApexTimerInternalsTest {
     final Map<String, Collection<TimerData>> firedTimers = new HashMap<>();
     TimerProcessor<String> timerProcessor = firedTimers::put;
 
-    TimerDataCoder timerDataCoder = 
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    TimerDataCoderV2 timerDataCoder = 
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
     String key1 = "key1";
     Instant instant0 = new Instant(0);
     Instant instant1 = new Instant(1);
@@ -85,7 +85,7 @@ public class ApexTimerInternalsTest {
 
   @Test
   public void testDeleteTimer() {
-    TimerDataCoder timerDataCoder = 
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    TimerDataCoderV2 timerDataCoder = 
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
     String key1 = "key1";
     Instant instant0 = new Instant(0);
     Instant instant1 = new Instant(1);
@@ -121,7 +121,7 @@ public class ApexTimerInternalsTest {
 
   @Test
   public void testSerialization() {
-    TimerDataCoder timerDataCoder = 
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE);
+    TimerDataCoderV2 timerDataCoder = 
TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE);
     TimerData timerData =
         TimerData.of(
             "arbitrary-id", StateNamespaces.global(), new Instant(0), 
TimeDomain.EVENT_TIME);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
index 072dbc7..d994762 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Timer.java
@@ -47,9 +47,13 @@ public abstract class Timer<T> {
 
   /** Returns a timer for the given timestamp with a user specified payload. */
   public static <T> Timer<T> of(Instant timestamp, @Nullable T payload) {
-    return new AutoValue_Timer(timestamp, payload);
+    return new AutoValue_Timer(timestamp, timestamp, payload);
   }
 
+  /** Returns a timer for the given timestamp with a user specified payload 
and outputTimestamp. */
+  public static <T> Timer<T> of(Instant timestamp, Instant outputTimestamp, 
@Nullable T payload) {
+    return new AutoValue_Timer(timestamp, outputTimestamp, payload);
+  }
   /**
    * Returns the timestamp of when the timer is scheduled to fire.
    *
@@ -58,6 +62,9 @@ public abstract class Timer<T> {
    */
   public abstract Instant getTimestamp();
 
+  /* Returns the outputTimestamps  */
+  public abstract Instant getOutputTimestamp();
+
   /** A user supplied payload. */
   @Nullable
   public abstract T getPayload();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 7b01c04..286e60b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -103,11 +103,19 @@ public class InMemoryTimerInternals implements 
TimerInternals {
 
   @Override
   public void setTimer(
-      StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-    setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant target,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
   }
 
-  /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
+  /**
+   * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant, 
Instant,
+   *     TimeDomain)}.
+   */
   @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
index 2949548..fc395de 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/KeyedWorkItemCoder.java
@@ -22,7 +22,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -54,7 +54,7 @@ public class KeyedWorkItemCoder<K, ElemT> extends 
StructuredCoder<KeyedWorkItem<
     this.keyCoder = keyCoder;
     this.elemCoder = elemCoder;
     this.windowCoder = windowCoder;
-    this.timersCoder = IterableCoder.of(TimerDataCoder.of(windowCoder));
+    this.timersCoder = IterableCoder.of(TimerDataCoderV2.of(windowCoder));
     this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, 
windowCoder));
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2b105fb..472a9d2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -992,7 +992,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
      * user has no way to compute a good choice of time.
      */
     private void setUnderlyingTimer(Instant target) {
-      timerInternals.setTimer(namespace, timerId, target, 
spec.getTimeDomain());
+      timerInternals.setTimer(namespace, timerId, "", target, target, 
spec.getTimeDomain());
     }
 
     private Instant getCurrentTime() {
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 14a9502..f69c74a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -208,7 +208,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
       // make sure this fires after any window.maxTimestamp() timers
       gcTime = gcTime.plus(GC_DELAY_MS);
       timerInternals.setTimer(
-          StateNamespaces.window(windowCoder, window), GC_TIMER_ID, gcTime, 
TimeDomain.EVENT_TIME);
+          StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID,
+          "",
+          gcTime,
+          window.maxTimestamp(),
+          TimeDomain.EVENT_TIME);
     }
 
     @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index a766143..f9f23ca 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -54,9 +54,18 @@ public interface TimerInternals {
    *
    * <p>It is an error to set a timer for two different time domains.
    */
-  void setTimer(StateNamespace namespace, String timerId, Instant target, 
TimeDomain timeDomain);
+  void setTimer(
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant target,
+      Instant outputTimestamp,
+      TimeDomain timeDomain);
 
-  /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
+  /**
+   * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant, 
Instant,
+   *     TimeDomain)}.
+   */
   @Deprecated
   void setTimer(TimerData timerData);
 
@@ -161,10 +170,19 @@ public interface TimerInternals {
 
     public abstract String getTimerId();
 
+    public abstract String getTimerFamilyId();
+
     public abstract StateNamespace getNamespace();
 
     public abstract Instant getTimestamp();
 
+    /**
+     * Timestamp the timer assigns to outputted elements from {@link
+     * org.apache.beam.sdk.transforms.DoFn.OnTimer} method. For event time 
timers, output watermark
+     * is held at this timestamp until the timer fires.
+     */
+    public abstract Instant getOutputTimestamp();
+
     public abstract TimeDomain getDomain();
 
     // When adding a new field, make sure to add it to the compareTo() method.
@@ -174,8 +192,25 @@ public interface TimerInternals {
      * generated.
      */
     public static TimerData of(
+        String timerId,
+        String timerFamilyId,
+        StateNamespace namespace,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain domain) {
+      return new AutoValue_TimerInternals_TimerData(
+          timerId, timerFamilyId, namespace, timestamp, outputTimestamp, 
domain);
+    }
+
+    /**
+     * Construct a {@link TimerData} for the given parameters, where the timer 
ID is automatically
+     * generated. Construct a {@link TimerData} for the given parameters 
except for {@code
+     * outputTimestamp}. {@code outputTimestamp} is set to timer {@code 
timestamp}.
+     */
+    public static TimerData of(
         String timerId, StateNamespace namespace, Instant timestamp, 
TimeDomain domain) {
-      return new AutoValue_TimerInternals_TimerData(timerId, namespace, 
timestamp, domain);
+      return new AutoValue_TimerInternals_TimerData(
+          timerId, timerId, namespace, timestamp, timestamp, domain);
     }
 
     /**
@@ -207,8 +242,10 @@ public interface TimerInternals {
       ComparisonChain chain =
           ComparisonChain.start()
               .compare(this.getTimestamp(), that.getTimestamp())
+              .compare(this.getOutputTimestamp(), that.getOutputTimestamp())
               .compare(this.getDomain(), that.getDomain())
-              .compare(this.getTimerId(), that.getTimerId());
+              .compare(this.getTimerId(), that.getTimerId())
+              .compare(this.getTimerFamilyId(), that.getTimerFamilyId());
       if (chain.result() == 0 && 
!this.getNamespace().equals(that.getNamespace())) {
         // Obtaining the stringKey may be expensive; only do so if required
         chain = chain.compare(getNamespace().stringKey(), 
that.getNamespace().stringKey());
@@ -218,6 +255,56 @@ public interface TimerInternals {
   }
 
   /** A {@link Coder} for {@link TimerData}. */
+  class TimerDataCoderV2 extends StructuredCoder<TimerData> {
+    private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
+    private static final InstantCoder INSTANT_CODER = InstantCoder.of();
+    private final Coder<? extends BoundedWindow> windowCoder;
+
+    public static TimerDataCoderV2 of(Coder<? extends BoundedWindow> 
windowCoder) {
+      return new TimerDataCoderV2(windowCoder);
+    }
+
+    private TimerDataCoderV2(Coder<? extends BoundedWindow> windowCoder) {
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void encode(TimerData timer, OutputStream outStream) throws 
CoderException, IOException {
+      STRING_CODER.encode(timer.getTimerId(), outStream);
+      STRING_CODER.encode(timer.getTimerFamilyId(), outStream);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+      INSTANT_CODER.encode(timer.getOutputTimestamp(), outStream);
+      STRING_CODER.encode(timer.getDomain().name(), outStream);
+    }
+
+    @Override
+    public TimerData decode(InputStream inStream) throws CoderException, 
IOException {
+      String timerId = STRING_CODER.decode(inStream);
+      String timerFamilyId = STRING_CODER.decode(inStream);
+      StateNamespace namespace =
+          StateNamespaces.fromString(STRING_CODER.decode(inStream), 
windowCoder);
+      Instant timestamp = INSTANT_CODER.decode(inStream);
+      Instant outputTimestamp = INSTANT_CODER.decode(inStream);
+      TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream));
+      return TimerData.of(timerId, timerFamilyId, namespace, timestamp, 
outputTimestamp, domain);
+    }
+
+    @Override
+    public List<? extends Coder<?>> getCoderArguments() {
+      return Arrays.asList(windowCoder);
+    }
+
+    @Override
+    public void verifyDeterministic() throws NonDeterministicException {
+      verifyDeterministic(this, "window coder must be deterministic", 
windowCoder);
+    }
+  }
+
+  /**
+   * A {@link Coder} for {@link TimerData}. To make it encoding and decoding 
backward compatible for
+   * DataFlow
+   */
   class TimerDataCoder extends StructuredCoder<TimerData> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
     private static final InstantCoder INSTANT_CODER = InstantCoder.of();
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
index 6be35b7..dd10632 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java
@@ -71,8 +71,8 @@ public class InMemoryTimerInternalsTest {
     Instant laterTimestamp = new Instant(42);
 
     underTest.advanceInputWatermark(new Instant(0));
-    underTest.setTimer(NS1, ID1, earlyTimestamp, TimeDomain.EVENT_TIME);
-    underTest.setTimer(NS1, ID1, laterTimestamp, TimeDomain.EVENT_TIME);
+    underTest.setTimer(NS1, ID1, ID1, earlyTimestamp, earlyTimestamp, 
TimeDomain.EVENT_TIME);
+    underTest.setTimer(NS1, ID1, ID1, laterTimestamp, laterTimestamp, 
TimeDomain.EVENT_TIME);
     underTest.advanceInputWatermark(earlyTimestamp.plus(1L));
     assertThat(underTest.removeNextEventTimer(), nullValue());
 
@@ -86,7 +86,7 @@ public class InMemoryTimerInternalsTest {
   public void testDeletionIdempotent() throws Exception {
     InMemoryTimerInternals underTest = new InMemoryTimerInternals();
     Instant timestamp = new Instant(42);
-    underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+    underTest.setTimer(NS1, ID1, ID1, timestamp, timestamp, 
TimeDomain.EVENT_TIME);
     underTest.deleteTimer(NS1, ID1);
     underTest.deleteTimer(NS1, ID1);
   }
@@ -97,7 +97,7 @@ public class InMemoryTimerInternalsTest {
     Instant timestamp = new Instant(42);
 
     underTest.advanceInputWatermark(new Instant(0));
-    underTest.setTimer(NS1, ID1, timestamp, TimeDomain.EVENT_TIME);
+    underTest.setTimer(NS1, ID1, ID1, timestamp, timestamp, 
TimeDomain.EVENT_TIME);
     underTest.deleteTimer(NS1, ID1);
     underTest.advanceInputWatermark(new Instant(43));
 
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index b790314..10972d6 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -156,6 +156,8 @@ public class SimpleDoFnRunnerTest {
         .setTimer(
             StateNamespaces.window(new GlobalWindows().windowCoder(), 
GlobalWindow.INSTANCE),
             DoFnWithTimers.TIMER_ID,
+            "",
+            currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
             currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
             TimeDomain.EVENT_TIME);
   }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
index d1e5221..ab2978f 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/TimerInternalsTest.java
@@ -22,7 +22,7 @@ import static org.hamcrest.Matchers.lessThan;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.CoderProperties;
@@ -40,13 +40,13 @@ public class TimerInternalsTest {
   @Test
   public void testTimerDataCoder() throws Exception {
     CoderProperties.coderDecodeEncodeEqual(
-        TimerDataCoder.of(GlobalWindow.Coder.INSTANCE),
+        TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE),
         TimerData.of(
             "arbitrary-id", StateNamespaces.global(), new Instant(0), 
TimeDomain.EVENT_TIME));
 
     Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
     CoderProperties.coderDecodeEncodeEqual(
-        TimerDataCoder.of(windowCoder),
+        TimerDataCoderV2.of(windowCoder),
         TimerData.of(
             "another-id",
             StateNamespaces.window(
@@ -57,7 +57,7 @@ public class TimerInternalsTest {
 
   @Test
   public void testCoderIsSerializableWithWellKnownCoderType() {
-    
CoderProperties.coderSerializable(TimerDataCoder.of(GlobalWindow.Coder.INSTANCE));
+    
CoderProperties.coderSerializable(TimerDataCoderV2.of(GlobalWindow.Coder.INSTANCE));
   }
 
   @Test
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 8f3ab48..0261bf6 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -47,11 +47,20 @@ class DirectTimerInternals implements TimerInternals {
 
   @Override
   public void setTimer(
-      StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-    timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, 
timeDomain));
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant target,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    timerUpdateBuilder.setTimer(
+        TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
   }
 
-  /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
+  /**
+   * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant, 
Instant,
+   *     TimeDomain)}.
+   */
   @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 822c15c..8e63679 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -171,7 +171,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
 
   final KeySelector<WindowedValue<InputT>, ?> keySelector;
 
-  private final TimerInternals.TimerDataCoder timerCoder;
+  private final TimerInternals.TimerDataCoderV2 timerCoder;
 
   /** Max number of elements to include in a bundle. */
   private final long maxBundleSize;
@@ -244,7 +244,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     this.keySelector = keySelector;
 
     this.timerCoder =
-        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+        
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
 
     FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
 
@@ -1088,11 +1088,20 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
 
     @Override
     public void setTimer(
-        StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-      setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant target,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      setTimer(
+          TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
     }
 
-    /** @deprecated use {@link #setTimer(StateNamespace, String, Instant, 
TimeDomain)}. */
+    /**
+     * @deprecated use {@link #setTimer(StateNamespace, String, String, 
Instant, Instant,
+     *     TimeDomain)}.
+     */
     @Deprecated
     @Override
     public void setTimer(TimerData timer) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index ba951b1..891cef8 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -839,7 +839,9 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
         timerInternals.setTimer(
             StateNamespaces.window(windowCoder, window),
             GC_TIMER_ID,
+            "",
             gcTime,
+            window.maxTimestamp(),
             TimeDomain.EVENT_TIME);
       } finally {
         stateBackendLock.unlock();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index a114b6f..76aa8b0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -602,6 +602,8 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
           .setTimer(
               StateNamespaces.window(windowCoder, window),
               timerId,
+              "",
+              cleanupTime,
               cleanupTime,
               TimeDomain.EVENT_TIME);
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index 7c5babf..2c00c99 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.core.TimerInternals.TimerDataCoder;
+import org.apache.beam.runners.core.TimerInternals.TimerDataCoderV2;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
 import org.apache.beam.sdk.coders.AtomicCoder;
@@ -60,6 +61,7 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
 
   private final StateTag<BagState<WindowedValue<InputT>>> elementsAddr;
   private final StateTag<BagState<TimerData>> timersAddr;
+  private final StateTag<BagState<TimerData>> oldTimersAddr;
   private final StateTag<WatermarkHoldState> watermarkHoldingAddr;
   private final StateTag<ValueState<Map<W, Set<Windmill.GlobalDataRequest>>>> 
blockedMapAddr;
 
@@ -85,8 +87,11 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
     this.elementsAddr =
         StateTags.makeSystemTagInternal(
             StateTags.bag("elem", WindowedValue.getFullCoder(inputCoder, 
mainWindowCoder)));
-    this.timersAddr =
+    this.oldTimersAddr =
         StateTags.makeSystemTagInternal(StateTags.bag("timer", 
TimerDataCoder.of(mainWindowCoder)));
+    this.timersAddr =
+        StateTags.makeSystemTagInternal(
+            StateTags.bag("timerV2", TimerDataCoderV2.of(mainWindowCoder)));
     StateTag<WatermarkHoldState> watermarkTag =
         StateTags.watermarkStateInternal(
             "holdForSideinput", windowingStrategy.getTimestampCombiner());
@@ -169,6 +174,7 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
     List<BagState<TimerData>> timers = Lists.newArrayList();
     for (W window : readyWindows) {
       timers.add(timerBag(window).readLater());
+      timers.add(timerOldBag(window).readLater());
     }
     return timers;
   }
@@ -275,6 +281,12 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
         .state(StateNamespaces.window(mainWindowCoder, window), timersAddr);
   }
 
+  BagState<TimerData> timerOldBag(W window) {
+    return stepContext
+        .stateInternals()
+        .state(StateNamespaces.window(mainWindowCoder, window), oldTimersAddr);
+  }
+
   private <SideWindowT extends BoundedWindow> Windmill.GlobalDataRequest 
buildGlobalDataRequest(
       PCollectionView<?> view, BoundedWindow mainWindow) {
     @SuppressWarnings("unchecked")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c2deb2f..b2ba62e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -94,8 +94,16 @@ class WindmillTimerInternals implements TimerInternals {
 
   @Override
   public void setTimer(
-      StateNamespace namespace, String timerId, Instant timestamp, TimeDomain 
timeDomain) {
-    timers.put(timerId, namespace, TimerData.of(timerId, namespace, timestamp, 
timeDomain));
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    timers.put(
+        timerId,
+        namespace,
+        TimerData.of(timerId, timerFamilyId, namespace, timestamp, 
outputTimestamp, timeDomain));
     timerStillPresent.put(timerId, namespace, true);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
index 286ea8a..e3d2277 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/TimerReceiver.java
@@ -119,7 +119,8 @@ public class TimerReceiver {
         String timerId = timerSpec.timerId();
 
         TimerInternals timerInternals = 
stepContext.namespacedToUser().timerInternals();
-        timerInternals.setTimer(namespace, timerId, timer.getTimestamp(), 
timeDomain);
+        timerInternals.setTimer(
+            namespace, timerId, "", timer.getTimestamp(), 
timer.getOutputTimestamp(), timeDomain);
 
         timerIdToKey.put(timerId, windowedValue.getValue().getKey());
         timerIdToPayload.put(timerId, timer.getPayload());
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
index c659601..920dae6 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
@@ -161,7 +161,12 @@ public class SDFFeederViaStateAndTimers<InputT, 
RestrictionT> {
 
     // Set a timer to continue processing this element.
     timerInternals.setTimer(
-        stateNamespace, "sdfContinuation", wakeupTime, 
TimeDomain.PROCESSING_TIME);
+        stateNamespace,
+        "sdfContinuation",
+        "sdfContinuation",
+        wakeupTime,
+        wakeupTime,
+        TimeDomain.PROCESSING_TIME);
   }
 
   /** Signals that a split happened. */
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
index d504929..330fb24 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
@@ -118,8 +118,14 @@ class KeyedInternals<K> {
 
     @Override
     public void setTimer(
-        StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-      getInternals().setTimer(namespace, timerId, target, timeDomain);
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant target,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      getInternals()
+          .setTimer(namespace, timerId, timerFamilyId, target, 
outputTimestamp, timeDomain);
     }
 
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 2f3b809..a6214be 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -120,9 +120,9 @@ public class KeyedTimerData<K> implements 
Comparable<KeyedTimerData<K>> {
   }
 
   /**
-   * Coder for {@link KeyedTimerData}. Note we don't use the {@link
-   * org.apache.beam.runners.core.TimerInternals.TimerDataCoder} here directly 
since we want to
-   * en/decode timestamp first so the timers will be sorted in the state.
+   * Coder for {@link KeyedTimerData}. Note we don't use the {@link 
TimerInternals.TimerDataCoderV2}
+   * here directly since we want to en/decode timestamp first so the timers 
will be sorted in the
+   * state.
    */
   public static class KeyedTimerDataCoder<K> extends 
StructuredCoder<KeyedTimerData<K>> {
     private static final StringUtf8Coder STRING_CODER = StringUtf8Coder.of();
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 676129d..9ac082b 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -184,8 +184,14 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
 
     @Override
     public void setTimer(
-        StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
-      setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+        StateNamespace namespace,
+        String timerId,
+        String timerFamilyId,
+        Instant target,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
+      setTimer(
+          TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
     }
 
     @Override
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index f8ff5e6..b741050 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -368,7 +368,7 @@ public class SparkGroupAlsoByWindowViaWindowSet implements 
Serializable {
     private final FullWindowedValueCoder<InputT> wvCoder;
     private final Coder<K> keyCoder;
     private final List<Integer> sourceIds;
-    private final TimerInternals.TimerDataCoder timerDataCoder;
+    private final TimerInternals.TimerDataCoderV2 timerDataCoder;
     private final WindowingStrategy<?, W> windowingStrategy;
     private final SerializablePipelineOptions options;
     private final IterableCoder<WindowedValue<InputT>> itrWvCoder;
@@ -461,9 +461,9 @@ public class SparkGroupAlsoByWindowViaWindowSet implements 
Serializable {
     return FullWindowedValueCoder.of(KvCoder.of(keyCoder, 
IterableCoder.of(iCoder)), wCoder);
   }
 
-  private static <W extends BoundedWindow> TimerInternals.TimerDataCoder 
timerDataCoderOf(
+  private static <W extends BoundedWindow> TimerInternals.TimerDataCoderV2 
timerDataCoderOf(
       final WindowingStrategy<?, W> windowingStrategy) {
-    return 
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
+    return 
TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder());
   }
 
   private static void checkpointIfNeeded(
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index 6cdcef4..1d5b36b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -155,7 +155,12 @@ public class SparkTimerInternals implements TimerInternals 
{
 
   @Override
   public void setTimer(
-      StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
+      StateNamespace namespace,
+      String timerId,
+      String timerFamilyId,
+      Instant target,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException("Setting a timer by ID not yet 
supported.");
   }
 
@@ -165,12 +170,12 @@ public class SparkTimerInternals implements 
TimerInternals {
   }
 
   public static Collection<byte[]> serializeTimers(
-      Collection<TimerData> timers, TimerDataCoder timerDataCoder) {
+      Collection<TimerData> timers, TimerDataCoderV2 timerDataCoder) {
     return CoderHelpers.toByteArrays(timers, timerDataCoder);
   }
 
   public static Iterator<TimerData> deserializeTimers(
-      Collection<byte[]> serTimers, TimerDataCoder timerDataCoder) {
+      Collection<byte[]> serTimers, TimerDataCoderV2 timerDataCoder) {
     return CoderHelpers.fromByteArrays(serTimers, timerDataCoder).iterator();
   }
 

Reply via email to