Repository: flink
Updated Branches:
  refs/heads/master f057ca9d9 -> 64f32f929


[FLINK-9201] Fixed the same merge window will be fired twice if watermark 
already passed the merge window


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3afb7b35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3afb7b35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3afb7b35

Branch: refs/heads/master
Commit: 3afb7b3512353e28cd2fb7f49cd88c8caaa0da0c
Parents: f057ca9
Author: yuemeng <[email protected]>
Authored: Thu Apr 26 16:55:05 2018 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon May 14 10:09:30 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/windowing/triggers/EventTimeTrigger.java   | 8 +++++++-
 .../api/windowing/triggers/ProcessingTimeTrigger.java        | 8 +++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index 2f8f16f..2066bba 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -69,7 +69,13 @@ public class EventTimeTrigger extends Trigger<Object, 
TimeWindow> {
        @Override
        public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
-               ctx.registerEventTimeTimer(window.maxTimestamp());
+               // only register a timer if the watermark is not yet past the 
end of the merged window
+               // this is in line with the logic in onElement(). If the 
watermark is past the end of
+               // the window onElement() will fire and setting a timer here 
would fire the window twice.
+               long windowMaxTimestamp = window.maxTimestamp();
+               if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
+                       ctx.registerEventTimeTimer(windowMaxTimestamp);
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3afb7b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index cd7869e..c8e6e6c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -60,7 +60,13 @@ public class ProcessingTimeTrigger extends Trigger<Object, 
TimeWindow> {
        @Override
        public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
-               ctx.registerProcessingTimeTimer(window.maxTimestamp());
+               // only register a timer if the time is not yet past the end of 
the merged window
+               // this is in line with the logic in onElement(). If the time 
is past the end of
+               // the window onElement() will fire and setting a timer here 
would fire the window twice.
+               long windowMaxTimestamp = window.maxTimestamp();
+               if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
+                       ctx.registerProcessingTimeTimer(windowMaxTimestamp);
+               }
        }
 
        @Override

Reply via email to