This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1b985bb [BEAM-10500] Make KeyedTimerDataCoder encode output timestamp
(#12535)
1b985bb is described below
commit 1b985bbda2bc87474dec376acb82521a7d2fe776
Author: Brian Hulette <[email protected]>
AuthorDate: Thu Aug 13 13:10:57 2020 -0700
[BEAM-10500] Make KeyedTimerDataCoder encode output timestamp (#12535)
---
.../org/apache/beam/runners/samza/runtime/KeyedTimerData.java | 6 ++++--
.../org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java | 8 +++++++-
2 files changed, 11 insertions(+), 3 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
index 46dc95a..ef925f0 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedTimerData.java
@@ -154,8 +154,9 @@ public class KeyedTimerData<K> implements
Comparable<KeyedTimerData<K>> {
throws CoderException, IOException {
final TimerData timer = value.getTimerData();
- // encode the timestamp first
+ // encode the timestamps first
INSTANT_CODER.encode(timer.getTimestamp(), outStream);
+ INSTANT_CODER.encode(timer.getOutputTimestamp(), outStream);
STRING_CODER.encode(timer.getTimerId(), outStream);
STRING_CODER.encode(timer.getNamespace().stringKey(), outStream);
STRING_CODER.encode(timer.getDomain().name(), outStream);
@@ -169,11 +170,12 @@ public class KeyedTimerData<K> implements
Comparable<KeyedTimerData<K>> {
public KeyedTimerData<K> decode(InputStream inStream) throws
CoderException, IOException {
// decode the timestamp first
final Instant timestamp = INSTANT_CODER.decode(inStream);
+ final Instant outputTimestamp = INSTANT_CODER.decode(inStream);
final String timerId = STRING_CODER.decode(inStream);
final StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream),
windowCoder);
final TimeDomain domain =
TimeDomain.valueOf(STRING_CODER.decode(inStream));
- final TimerData timer = TimerData.of(timerId, namespace, timestamp,
timestamp, domain);
+ final TimerData timer = TimerData.of(timerId, namespace, timestamp,
outputTimestamp, domain);
byte[] keyBytes = null;
K key = null;
diff --git
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
index c16b4ce..4cfd051 100644
---
a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
+++
b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/KeyedTimerDataTest.java
@@ -25,18 +25,24 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
/** Tests for {@link KeyedTimerData}. */
public class KeyedTimerDataTest {
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ private static final Instant TIMESTAMP =
+ new DateTime(2020, 8, 11, 13, 42, 9, DateTimeZone.UTC).toInstant();
+ private static final Instant OUTPUT_TIMESTAMP =
TIMESTAMP.plus(Duration.standardSeconds(30));
@Test
public void testCoder() throws Exception {
final TimerInternals.TimerData td =
TimerInternals.TimerData.of(
- "timer", StateNamespaces.global(), new Instant(), new Instant(),
TimeDomain.EVENT_TIME);
+ "timer", StateNamespaces.global(), TIMESTAMP, OUTPUT_TIMESTAMP,
TimeDomain.EVENT_TIME);
final String key = "timer-key";
final ByteArrayOutputStream baos = new ByteArrayOutputStream();