This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f1c1f  [BEAM-10670] Fix impulse to use MIN_TIMESTAMP and not current 
time.
     new 6fe33a5  Merge pull request #13002 from lukecwik/beam10670.3
a5f1c1f is described below

commit a5f1c1f7acf37fe003c5321b4c280d8dd8df8d41
Author: Luke Cwik <[email protected]>
AuthorDate: Fri Oct 2 11:46:49 2020 -0700

    [BEAM-10670] Fix impulse to use MIN_TIMESTAMP and not current time.
---
 .../beam/runners/samza/translation/SamzaImpulseSystemFactory.java     | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
index 02c734b..cb7413d 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaImpulseSystemFactory.java
@@ -38,7 +38,6 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
-import org.joda.time.Instant;
 
 /**
  * This is a trivial system for generating impulse event in Samza when 
translating IMPULSE transform
@@ -111,13 +110,12 @@ public class SamzaImpulseSystemFactory implements 
SystemFactory {
     public void register(SystemStreamPartition ssp, String offset) {}
 
     private static List<IncomingMessageEnvelope> 
constructMessages(SystemStreamPartition ssp) {
-      final Instant time = new Instant(System.currentTimeMillis());
       final IncomingMessageEnvelope impulseMessage =
           new IncomingMessageEnvelope(
               ssp,
               DUMMY_OFFSET,
               /* key */ null,
-              
OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(new byte[0], 
time)));
+              OpMessage.ofElement(WindowedValue.valueInGlobalWindow(new 
byte[0])));
 
       final IncomingMessageEnvelope watermarkMessage =
           IncomingMessageEnvelope.buildWatermarkEnvelope(

Reply via email to