Repository: flink
Updated Branches:
  refs/heads/release-1.0 c0ce1b6d9 -> 603f351e2


[FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger

This closes #1727


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/603f351e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/603f351e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/603f351e

Branch: refs/heads/release-1.0
Commit: 603f351e2925aa4263a382b83cf41f12e8c5167f
Parents: c0ce1b6
Author: Stephan Ewen <[email protected]>
Authored: Fri Feb 26 20:06:06 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 26 21:12:27 2016 +0100

----------------------------------------------------------------------
 .../api/windowing/assigners/TumblingProcessingTimeWindows.java    | 3 ++-
 .../streaming/api/windowing/triggers/ProcessingTimeTrigger.java   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/603f351e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 01de688..83f3d0c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -52,7 +52,8 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
 
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
-               long start = timestamp - (timestamp % size);
+               final long now = System.currentTimeMillis();
+               long start = now - (now % size);
                return Collections.singletonList(new TimeWindow(start, start + 
size));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/603f351e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 514885f..387e73b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, 
TimeWindow> {
 
        @Override
        public TriggerResult onElement(Object element, long timestamp, 
TimeWindow window, TriggerContext ctx) {
-               ctx.registerProcessingTimeTimer(window.maxTimestamp());
+               ctx.registerProcessingTimeTimer(window.getEnd());
                return TriggerResult.CONTINUE;
        }
 

Reply via email to