Repository: flink Updated Branches: refs/heads/master c3b013b9d -> 5c43d2b8a
[FLINK-5363] Fire timers when window state is currently empty Before, a window Trigger would not be invoked if the window is empty at the time of the timer firing. Now the Trigger is always invoked. As a side effect, this resolves FLINK-9687. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdce4b2e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdce4b2e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdce4b2e Branch: refs/heads/master Commit: fdce4b2ee0b4127c296f2d1e27fe9ecbbedc1676 Parents: c3b013b Author: minwenjun <minwen...@didichuxing.com> Authored: Thu Jun 28 17:54:56 2018 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Jul 11 13:35:46 2018 +0200 ---------------------------------------------------------------------- .../api/windowing/triggers/Trigger.java | 10 ------ .../windowing/EvictingWindowOperator.java | 30 ++++++++-------- .../operators/windowing/WindowOperator.java | 36 +++++++++----------- .../windowing/WindowOperatorContractTest.java | 4 +-- 4 files changed, 34 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index f41cce8..ab13095 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -70,11 +70,6 @@ public abstract class Trigger<T, W extends Window> implements Serializable { /** * Called when a processing-time timer that was set using the trigger context fires. * - * <p>Note: This method is not called in case the window does not contain any elements. Thus, - * if you return {@code PURGE} from a trigger method and you expect to do cleanup in a future - * invocation of a timer callback it might be wise to clean any state that you would clean - * in the timer callback. - * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. @@ -84,11 +79,6 @@ public abstract class Trigger<T, W extends Window> implements Serializable { /** * Called when an event-time timer that was set using the trigger context fires. * - * <p>Note: This method is not called in case the window does not contain any elements. Thus, - * if you return {@code PURGE} from a trigger method and you expect to do cleanup in a future - * invocation of a timer callback it might be wise to clean any state that you would clean - * in the timer callback. - * * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 65cf167..410b280 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -263,16 +263,17 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(triggerContext.window); } - Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); + TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); - if (contents != null) { - TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); - if (triggerResult.isFire()) { + if (triggerResult.isFire()) { + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); + if (contents != null) { emitWindowContents(triggerContext.window, contents, evictingWindowState); } - if (triggerResult.isPurge()) { - evictingWindowState.clear(); - } + } + + if (triggerResult.isPurge()) { + evictingWindowState.clear(); } if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { @@ -309,16 +310,17 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictingWindowState.setCurrentNamespace(triggerContext.window); } - Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); + TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); - if (contents != null) { - TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { + if (triggerResult.isFire()) { + Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); + if (contents != null) { emitWindowContents(triggerContext.window, contents, evictingWindowState); } - if (triggerResult.isPurge()) { - evictingWindowState.clear(); - } + } + + if (triggerResult.isPurge()) { + evictingWindowState.clear(); } if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index ecce1fb..1c79f68 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -446,19 +446,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> mergingWindows = null; } - ACC contents = null; - if (windowState != null) { - contents = windowState.get(); - } + TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); - if (contents != null) { - TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp()); - if (triggerResult.isFire()) { + if (triggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents != null) { emitWindowContents(triggerContext.window, contents); } - if (triggerResult.isPurge()) { - windowState.clear(); - } + } + + if (triggerResult.isPurge()) { + windowState.clear(); } if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { @@ -494,19 +492,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> mergingWindows = null; } - ACC contents = null; - if (windowState != null) { - contents = windowState.get(); - } + TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); - if (contents != null) { - TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { + if (triggerResult.isFire()) { + ACC contents = windowState.get(); + if (contents != null) { emitWindowContents(triggerContext.window, contents); } - if (triggerResult.isPurge()) { - windowState.clear(); - } + } + + if (triggerResult.isPurge()) { + windowState.clear(); } if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) { http://git-wip-us.apache.org/repos/asf/flink/blob/fdce4b2e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index c8368ac..8dee3b0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -1110,7 +1110,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.advanceTime(testHarness, 0L); // trigger is not called if there is no more window (timer is silently ignored) - timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); verify(mockWindowFunction, never()) .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); @@ -1174,7 +1174,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { timeAdaptor.advanceTime(testHarness, 0L); // trigger is not called if there is no more window (timer is silently ignored) - timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null); + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); verify(mockWindowFunction, never()) .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());