Repository: flink Updated Branches: refs/heads/master 69eeefabf -> 64519e1c1
[FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger This closes #1727 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64519e1c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64519e1c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64519e1c Branch: refs/heads/master Commit: 64519e1c1593471bc21ef78c7c93591fb75a4fcf Parents: 69eeefa Author: Stephan Ewen <[email protected]> Authored: Fri Feb 26 20:06:06 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 26 21:10:12 2016 +0100 ---------------------------------------------------------------------- .../api/windowing/assigners/TumblingProcessingTimeWindows.java | 3 ++- .../streaming/api/windowing/triggers/ProcessingTimeTrigger.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index 01de688..83f3d0c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -52,7 +52,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp) { - long start = timestamp - (timestamp % size); + final long now = System.currentTimeMillis(); + long start = now - (now % size); return Collections.singletonList(new TimeWindow(start, start + size)); } http://git-wip-us.apache.org/repos/asf/flink/blob/64519e1c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index 514885f..387e73b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer(window.getEnd()); return TriggerResult.CONTINUE; }
