Repository: flink Updated Branches: refs/heads/release-1.0 c0ce1b6d9 -> 603f351e2
[FLINK-3526] [streaming] Fix Processing Time Window Assigner and Trigger This closes #1727 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/603f351e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/603f351e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/603f351e Branch: refs/heads/release-1.0 Commit: 603f351e2925aa4263a382b83cf41f12e8c5167f Parents: c0ce1b6 Author: Stephan Ewen <[email protected]> Authored: Fri Feb 26 20:06:06 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 26 21:12:27 2016 +0100 ---------------------------------------------------------------------- .../api/windowing/assigners/TumblingProcessingTimeWindows.java | 3 ++- .../streaming/api/windowing/triggers/ProcessingTimeTrigger.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/603f351e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index 01de688..83f3d0c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -52,7 +52,8 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi @Override public Collection<TimeWindow> assignWindows(Object element, long timestamp) { - long start = timestamp - (timestamp % size); + final long now = System.currentTimeMillis(); + long start = now - (now % size); return Collections.singletonList(new TimeWindow(start, start + size)); } http://git-wip-us.apache.org/repos/asf/flink/blob/603f351e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index 514885f..387e73b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -33,7 +33,7 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + ctx.registerProcessingTimeTimer(window.getEnd()); return TriggerResult.CONTINUE; }
