Repository: flink Updated Branches: refs/heads/master c36977f76 -> e69693778
[hotfix] Add Window Parameter in Trigger.onEventTime/onProcessingTime Before these trigger methods had no information about the window that they are responsible for. This information might be required for implementing more advanced trigger behaviour. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e18cdd04 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e18cdd04 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e18cdd04 Branch: refs/heads/master Commit: e18cdd0498003417c2fe6b0f446f6a943fbc98e3 Parents: c36977f Author: Aljoscha Krettek <[email protected]> Authored: Fri Oct 23 11:28:58 2015 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Fri Oct 23 15:48:02 2015 +0200 ---------------------------------------------------------------------- .../streaming/examples/windowing/SessionWindowing.java | 5 ++--- .../api/windowing/assigners/GlobalWindows.java | 4 ++-- .../windowing/triggers/ContinuousEventTimeTrigger.java | 5 ++--- .../triggers/ContinuousProcessingTimeTrigger.java | 5 ++--- .../streaming/api/windowing/triggers/CountTrigger.java | 5 ++--- .../streaming/api/windowing/triggers/DeltaTrigger.java | 5 ++--- .../api/windowing/triggers/EventTimeTrigger.java | 12 +++++++----- .../api/windowing/triggers/ProcessingTimeTrigger.java | 5 ++--- .../api/windowing/triggers/PurgingTrigger.java | 8 ++++---- .../flink/streaming/api/windowing/triggers/Trigger.java | 12 ++++++------ .../operators/windowing/NonKeyedWindowOperator.java | 4 ++-- .../runtime/operators/windowing/WindowOperator.java | 4 ++-- .../apache/flink/streaming/util/TestHarnessUtil.java | 1 - 13 files changed, 35 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 3c63156..035727a 100644 --- a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -126,7 +126,7 @@ public class SessionWindowing { } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception { + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { OperatorState<Long> lastSeenState = ctx.getKeyValueState("last-seen", 1L); Long lastSeen = lastSeenState.value(); @@ -137,8 +137,7 @@ public class SessionWindowing { } @Override - public TriggerResult onProcessingTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 4d5b9d7..99a4962 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -79,12 +79,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override - public TriggerResult onProcessingTime(long time, TriggerContext ctx) { + public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index ea26309..4b6af8f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -57,14 +57,13 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; } @Override - public TriggerResult onProcessingTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index be56738..66f9bda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -63,13 +63,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge } @Override - public TriggerResult onEventTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override - public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { OperatorState<Long> fireState = ctx.getKeyValueState("fire-timestamp", 0L); long nextFireTimestamp = fireState.value(); http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index 8512989..efb62d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -49,13 +49,12 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override - public TriggerResult onProcessingTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 1c6523d..d791d28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -59,13 +59,12 @@ public class DeltaTrigger<T extends Serializable, W extends Window> implements T } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override - public TriggerResult onProcessingTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index 4b6613c..831e360 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -37,13 +37,12 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> { } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) { + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } @Override - public TriggerResult onProcessingTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @@ -53,10 +52,13 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> { } /** - * Creates trigger that fires once the watermark passes the end of the window. + * Creates an event-time trigger that fires once the watermark passes the end of the window. + * + * <p> + * Once the trigger fires all elements are discarded. Elements that arrive late immediately + * trigger window evaluation with just this one element. */ public static EventTimeTrigger create() { return new EventTimeTrigger(); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index 6278ba6..b460c8a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -35,13 +35,12 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { } @Override - public TriggerResult onEventTime(long time, - TriggerContext ctx) throws Exception { + public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override - public TriggerResult onProcessingTime(long time, TriggerContext ctx) { + public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index eaca336..cc20296 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -53,8 +53,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { } @Override - public TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception { - TriggerResult triggerResult = nestedTrigger.onEventTime(time, ctx); + public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { + TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); switch (triggerResult) { case FIRE: return TriggerResult.FIRE_AND_PURGE; @@ -66,8 +66,8 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { } @Override - public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception { - TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, ctx); + public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { + TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx); switch (triggerResult) { case FIRE: return TriggerResult.FIRE_AND_PURGE; http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index ef8110b..15ccb33 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -60,7 +60,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception; + TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. @@ -68,7 +68,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onEventTime(long time, TriggerContext ctx) throws Exception; + TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** @@ -91,19 +91,19 @@ public interface Trigger<T, W extends Window> extends Serializable { /** * Register a system time callback. When the current system time passes the specified - * time {@link #onProcessingTime(long, TriggerContext)} is called with the time specified here. + * time {@link #onProcessingTime(long, Window, TriggerContext)} is called with the time specified here. * - * @param time The time at which to invoke {@link #onProcessingTime(long, TriggerContext)} + * @param time The time at which to invoke {@link #onProcessingTime(long, Window, TriggerContext)} */ void registerProcessingTimeTimer(long time); /** * Register an event-time callback. When the current watermark passes the specified - * time {@link #onEventTime(long, TriggerContext)} is called with the time specified here. + * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified here. * * @see org.apache.flink.streaming.api.watermark.Watermark * - * @param time The watermark at which to invoke {@link #onEventTime(long, TriggerContext)} + * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)} */ void registerEventTimeTimer(long time); http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index 03e8c4c..2209d5e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -437,7 +437,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public Trigger.TriggerResult onProcessingTime(long time) throws Exception { if (time == processingTimeTimer) { - return trigger.onProcessingTime(time, this); + return trigger.onProcessingTime(time, window, this); } else { return Trigger.TriggerResult.CONTINUE; } @@ -445,7 +445,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> public Trigger.TriggerResult onEventTime(long time) throws Exception { if (time == watermarkTimer) { - return trigger.onEventTime(time, this); + return trigger.onEventTime(time, window, this); } else { return Trigger.TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 30ce477..e8e001d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -510,7 +510,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> public Trigger.TriggerResult onProcessingTime(long time) throws Exception { if (time == processingTimeTimer) { - return trigger.onProcessingTime(time, this); + return trigger.onProcessingTime(time, window, this); } else { return Trigger.TriggerResult.CONTINUE; } @@ -518,7 +518,7 @@ public class WindowOperator<K, IN, OUT, W extends Window> public Trigger.TriggerResult onEventTime(long time) throws Exception { if (time == watermarkTimer) { - return trigger.onEventTime(time, this); + return trigger.onEventTime(time, window, this); } else { return Trigger.TriggerResult.CONTINUE; } http://git-wip-us.apache.org/repos/asf/flink/blob/e18cdd04/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java index 0c5cd8f..889ae37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java @@ -21,7 +21,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.junit.Assert; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.LinkedList; import java.util.List;
