This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b985bb  [BEAM-10500] Make KeyedTimerDataCoder encode output timestamp 
(#12535)
1b985bb is described below

commit 1b985bbda2bc87474dec376acb82521a7d2fe776
Author: Brian Hulette <[email protected]>
AuthorDate: Thu Aug 13 13:10:57 2020 -0700

    [BEAM-10500] Make KeyedTimerDataCoder encode output timestamp (#12535)
---
 .../org/apache/beam/runners/samza/runtime/KeyedTimerData.java     | 6 ++++--
 .../org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java | 8 +++++++-
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 46dc95a..ef925f0 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -154,8 +154,9 @@ public class KeyedTimerData<K> implements 
Comparable<KeyedTimerData<K>> {
         throws CoderException, IOException {
 
       final TimerData timer = value.getTimerData();
-      // encode the timestamp first
+      // encode the timestamps first
       INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+      INSTANT_CODER.encode(timer.getOutputTimestamp(), outStream);
       STRING_CODER.encode(timer.getTimerId(), outStream);
       STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
       STRING_CODER.encode(timer.getDomain().name(), outStream);
@@ -169,11 +170,12 @@ public class KeyedTimerData<K> implements 
Comparable<KeyedTimerData<K>> {
     public KeyedTimerData<K> decode(InputStream inStream) throws 
CoderException, IOException {
       // decode the timestamp first
       final Instant timestamp = INSTANT_CODER.decode(inStream);
+      final Instant outputTimestamp = INSTANT_CODER.decode(inStream);
       final String timerId = STRING_CODER.decode(inStream);
       final StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream), 
windowCoder);
       final TimeDomain domain = 
TimeDomain.valueOf(STRING_CODER.decode(inStream));
-      final TimerData timer = TimerData.of(timerId, namespace, timestamp, 
timestamp, domain);
+      final TimerData timer = TimerData.of(timerId, namespace, timestamp, 
outputTimestamp, domain);
 
       byte[] keyBytes = null;
       K key = null;
diff --git 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index c16b4ce..4cfd051 100644
--- 
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++ 
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -25,18 +25,24 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
 
 /** Tests for {@link KeyedTimerData}. */
 public class KeyedTimerDataTest {
   private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  private static final Instant TIMESTAMP =
+      new DateTime(2020, 8, 11, 13, 42, 9, DateTimeZone.UTC).toInstant();
+  private static final Instant OUTPUT_TIMESTAMP = 
TIMESTAMP.plus(Duration.standardSeconds(30));
 
   @Test
   public void testCoder() throws Exception {
     final TimerInternals.TimerData td =
         TimerInternals.TimerData.of(
-            "timer", StateNamespaces.global(), new Instant(), new Instant(), 
TimeDomain.EVENT_TIME);
+            "timer", StateNamespaces.global(), TIMESTAMP, OUTPUT_TIMESTAMP, 
TimeDomain.EVENT_TIME);
 
     final String key = "timer-key";
     final ByteArrayOutputStream baos = new ByteArrayOutputStream();

Reply via email to