Repository: flink Updated Branches: refs/heads/master f057ca9d9 -> 64f32f929
[FLINK-9201] Fixed the same merge window will be fired twice if watermark already passed the merge window Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3afb7b35 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3afb7b35 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3afb7b35 Branch: refs/heads/master Commit: 3afb7b3512353e28cd2fb7f49cd88c8caaa0da0c Parents: f057ca9 Author: yuemeng <[email protected]> Authored: Thu Apr 26 16:55:05 2018 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Mon May 14 10:09:30 2018 +0200 ---------------------------------------------------------------------- .../streaming/api/windowing/triggers/EventTimeTrigger.java | 8 +++++++- .../api/windowing/triggers/ProcessingTimeTrigger.java | 8 +++++++- 2 files changed, 14 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index 2f8f16f..2066bba 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -69,7 +69,13 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> { @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { - ctx.registerEventTimeTimer(window.maxTimestamp()); + // only register a timer if the watermark is not yet past the end of the merged window + // this is in line with the logic in onElement(). If the watermark is past the end of + // the window onElement() will fire and setting a timer here would fire the window twice. + long windowMaxTimestamp = window.maxTimestamp(); + if (windowMaxTimestamp > ctx.getCurrentWatermark()) { + ctx.registerEventTimeTimer(windowMaxTimestamp); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index cd7869e..c8e6e6c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -60,7 +60,13 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + // only register a timer if the time is not yet past the end of the merged window + // this is in line with the logic in onElement(). If the time is past the end of + // the window onElement() will fire and setting a timer here would fire the window twice. + long windowMaxTimestamp = window.maxTimestamp(); + if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { + ctx.registerProcessingTimeTimer(windowMaxTimestamp); + } } @Override
