This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a5f1c1f [BEAM-10670] Fix impulse to use MIN_TIMESTAMP and not current
time.
new 6fe33a5 Merge pull request #13002 from lukecwik/beam10670.3
a5f1c1f is described below
commit a5f1c1f7acf37fe003c5321b4c280d8dd8df8d41
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Oct 2 11:46:49 2020 -0700
[BEAM-10670] Fix impulse to use MIN_TIMESTAMP and not current time.
---
.../beam/runners/samza/translation/SamzaImpulseSystemFactory.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
index 02c734b..cb7413d 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
@@ -38,7 +38,6 @@ import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
-import org.joda.time.Instant;
/**
* This is a trivial system for generating impulse event in Samza when
translating IMPULSE transform
@@ -111,13 +110,12 @@ public class SamzaImpulseSystemFactory implements
SystemFactory {
public void register(SystemStreamPartition ssp, String offset) {}
private static List<IncomingMessageEnvelope>
constructMessages(SystemStreamPartition ssp) {
- final Instant time = new Instant(System.currentTimeMillis());
final IncomingMessageEnvelope impulseMessage =
new IncomingMessageEnvelope(
ssp,
DUMMY_OFFSET,
/* key */ null,
-
OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(new byte[0],
time)));
+ OpMessage.ofElement(WindowedValue.valueInGlobalWindow(new
byte[0])));
final IncomingMessageEnvelope watermarkMessage =
IncomingMessageEnvelope.buildWatermarkEnvelope(