Repository: flink Updated Branches: refs/heads/release-1.1 424fb24c3 -> fc24c30ee
[FLINK-5149] let ContinuousEventTimeTrigger fire at the end of the window This changes the ContinuousEventTimeTrigger to behave like the EventTimeTrigger in the sense that it also triggers at the end of the window. This prevents the trigger from not firing at all in case the first trigger interval is after the window end. This closes #2860. [typo] fix toString() of ContinuousEventTimeTrigger This closes #2854. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc24c30e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc24c30e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc24c30e Branch: refs/heads/release-1.1 Commit: fc24c30ee0edb3a66196629cece942bb8a0b155c Parents: 424fb24 Author: Maximilian Michels <[email protected]> Authored: Wed Nov 23 16:01:35 2016 +0100 Committer: Maximilian Michels <[email protected]> Committed: Thu Nov 24 11:38:42 2016 +0100 ---------------------------------------------------------------------- .../triggers/ContinuousEventTimeTrigger.java | 22 +++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fc24c30e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index cb8cdf5..c562fa9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -53,31 +53,39 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); + if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + // if the watermark is already past the window fire immediately + return TriggerResult.FIRE; + } else { + ctx.registerEventTimeTimer(window.maxTimestamp()); + } + ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get() == null) { long start = timestamp - (timestamp % interval); long nextFireTimestamp = start + interval; - ctx.registerEventTimeTimer(nextFireTimestamp); - fireTimestamp.add(nextFireTimestamp); - return TriggerResult.CONTINUE; } + return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { - ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); + if (time == window.maxTimestamp()){ + return TriggerResult.FIRE; + } + + ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc); if (fireTimestamp.get().equals(time)) { fireTimestamp.clear(); fireTimestamp.add(time + interval); ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; - } + return TriggerResult.CONTINUE; } @@ -113,7 +121,7 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object @Override public String toString() { - return "ContinuousProcessingTimeTrigger(" + interval + ")"; + return "ContinuousEventTimeTrigger(" + interval + ")"; } @VisibleForTesting
