Repository: flink
Updated Branches:
  refs/heads/master 69eeefabf -> 64519e1c1


[FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger

This closes #1727


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64519e1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64519e1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64519e1c

Branch: refs/heads/master
Commit: 64519e1c1593471bc21ef78c7c93591fb75a4fcf
Parents: 69eeefa
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 26 20:06:06 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 26 21:10:12 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/assigners/TumblingProcessingTimeWindows.java    | 3 ++-
 .../streaming/api/windowing/triggers/ProcessingTimeTrigger.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 01de688..83f3d0c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -52,7 +52,8 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
 
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
-               long start = timestamp - (timestamp % size);
+               final long now = System.currentTimeMillis();
+               long start = now - (now % size);
                return Collections.singletonList(new TimeWindow(start, start + 
size));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 514885f..387e73b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, 
TimeWindow> {
 
        @Override
        public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) {
-               ctx.registerProcessingTimeTimer(window.maxTimestamp());
+               ctx.registerProcessingTimeTimer(window.getEnd());
                return TriggerResult.CONTINUE;
        }
 

Reply via email to