Repository: flink
Updated Branches:
  refs/heads/master c3b013b9d -> 5c43d2b8a


[FLINK-5363] Fire timers when window state is currently empty

Before, a window Trigger would not be invoked if the window is empty at
the time of the timer firing. Now the Trigger is always invoked.

As a side effect, this resolves FLINK-9687.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdce4b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdce4b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdce4b2e

Branch: refs/heads/master
Commit: fdce4b2ee0b4127c296f2d1e27fe9ecbbedc1676
Parents: c3b013b
Author: minwenjun <minwen...@didichuxing.com>
Authored: Thu Jun 28 17:54:56 2018 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Jul 11 13:35:46 2018 +0200

----------------------------------------------------------------------
 .../api/windowing/triggers/Trigger.java         | 10 ------
 .../windowing/EvictingWindowOperator.java       | 30 ++++++++--------
 .../operators/windowing/WindowOperator.java     | 36 +++++++++-----------
 .../windowing/WindowOperatorContractTest.java   |  4 +--
 4 files changed, 34 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index f41cce8..ab13095 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -70,11 +70,6 @@ public abstract class Trigger<T, W extends Window> 
implements Serializable {
        /**
         * Called when a processing-time timer that was set using the trigger 
context fires.
         *
-        * <p>Note: This method is not called in case the window does not 
contain any elements. Thus,
-        * if you return {@code PURGE} from a trigger method and you expect to 
do cleanup in a future
-        * invocation of a timer callback it might be wise to clean any state 
that you would clean
-        * in the timer callback.
-        *
         * @param time The timestamp at which the timer fired.
         * @param window The window for which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.
@@ -84,11 +79,6 @@ public abstract class Trigger<T, W extends Window> 
implements Serializable {
        /**
         * Called when an event-time timer that was set using the trigger 
context fires.
         *
-        * <p>Note: This method is not called in case the window does not 
contain any elements. Thus,
-        * if you return {@code PURGE} from a trigger method and you expect to 
do cleanup in a future
-        * invocation of a timer callback it might be wise to clean any state 
that you would clean
-        * in the timer callback.
-        *
         * @param time The timestamp at which the timer fired.
         * @param window The window for which the timer fired.
         * @param ctx A context object that can be used to register timer 
callbacks.

http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 65cf167..410b280 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -263,16 +263,17 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                        
evictingWindowState.setCurrentNamespace(triggerContext.window);
                }
 
-               Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
+               TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
 
-               if (contents != null) {
-                       TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
-                       if (triggerResult.isFire()) {
+               if (triggerResult.isFire()) {
+                       Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
+                       if (contents != null) {
                                emitWindowContents(triggerContext.window, 
contents, evictingWindowState);
                        }
-                       if (triggerResult.isPurge()) {
-                               evictingWindowState.clear();
-                       }
+               }
+
+               if (triggerResult.isPurge()) {
+                       evictingWindowState.clear();
                }
 
                if (windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
@@ -309,16 +310,17 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window>
                        
evictingWindowState.setCurrentNamespace(triggerContext.window);
                }
 
-               Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
+               TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
 
-               if (contents != null) {
-                       TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
-                       if (triggerResult.isFire()) {
+               if (triggerResult.isFire()) {
+                       Iterable<StreamRecord<IN>> contents = 
evictingWindowState.get();
+                       if (contents != null) {
                                emitWindowContents(triggerContext.window, 
contents, evictingWindowState);
                        }
-                       if (triggerResult.isPurge()) {
-                               evictingWindowState.clear();
-                       }
+               }
+
+               if (triggerResult.isPurge()) {
+                       evictingWindowState.clear();
                }
 
                if (!windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index ecce1fb..1c79f68 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -446,19 +446,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        mergingWindows = null;
                }
 
-               ACC contents = null;
-               if (windowState != null) {
-                       contents = windowState.get();
-               }
+               TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
 
-               if (contents != null) {
-                       TriggerResult triggerResult = 
triggerContext.onEventTime(timer.getTimestamp());
-                       if (triggerResult.isFire()) {
+               if (triggerResult.isFire()) {
+                       ACC contents = windowState.get();
+                       if (contents != null) {
                                emitWindowContents(triggerContext.window, 
contents);
                        }
-                       if (triggerResult.isPurge()) {
-                               windowState.clear();
-                       }
+               }
+
+               if (triggerResult.isPurge()) {
+                       windowState.clear();
                }
 
                if (windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {
@@ -494,19 +492,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        mergingWindows = null;
                }
 
-               ACC contents = null;
-               if (windowState != null) {
-                       contents = windowState.get();
-               }
+               TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
 
-               if (contents != null) {
-                       TriggerResult triggerResult = 
triggerContext.onProcessingTime(timer.getTimestamp());
-                       if (triggerResult.isFire()) {
+               if (triggerResult.isFire()) {
+                       ACC contents = windowState.get();
+                       if (contents != null) {
                                emitWindowContents(triggerContext.window, 
contents);
                        }
-                       if (triggerResult.isPurge()) {
-                               windowState.clear();
-                       }
+               }
+
+               if (triggerResult.isPurge()) {
+                       windowState.clear();
                }
 
                if (!windowAssigner.isEventTime() && 
isCleanupTime(triggerContext.window, timer.getTimestamp())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index c8368ac..8dee3b0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1110,7 +1110,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                timeAdaptor.advanceTime(testHarness, 0L);
 
                // trigger is not called if there is no more window (timer is 
silently ignored)
-               timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, 
null);
+               timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, 
null);
 
                verify(mockWindowFunction, never())
                                .process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<List<Integer>>anyCollector());
@@ -1174,7 +1174,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                timeAdaptor.advanceTime(testHarness, 0L);
 
                // trigger is not called if there is no more window (timer is 
silently ignored)
-               timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, 
null);
+               timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, 
null);
 
                verify(mockWindowFunction, never())
                                .process(anyInt(), anyTimeWindow(), 
anyInternalWindowContext(), anyIntIterable(), 
WindowOperatorContractTest.<List<Integer>>anyCollector());

Reply via email to