Repository: flink Updated Branches: refs/heads/master 8df0bbacb -> fd324ea72
[FLINK-3371] [api-breaking] Move TriggerResult and TriggerContext to dedicated classes This closes #1603 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50bd65a5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50bd65a5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50bd65a5 Branch: refs/heads/master Commit: 50bd65a574776817a03dd32fd438cb2327447109 Parents: 8df0bba Author: Stephan Ewen <se...@apache.org> Authored: Sun Feb 7 21:46:16 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 10 22:15:31 2016 +0100 ---------------------------------------------------------------------- .../examples/windowing/SessionWindowing.java | 3 +- .../api/windowing/assigners/GlobalWindows.java | 10 +- .../triggers/ContinuousEventTimeTrigger.java | 7 +- .../ContinuousProcessingTimeTrigger.java | 2 +- .../api/windowing/triggers/CountTrigger.java | 2 +- .../api/windowing/triggers/DeltaTrigger.java | 7 +- .../windowing/triggers/EventTimeTrigger.java | 5 +- .../triggers/ProcessingTimeTrigger.java | 5 +- .../api/windowing/triggers/PurgingTrigger.java | 4 +- .../api/windowing/triggers/Trigger.java | 102 +++++-------------- .../api/windowing/triggers/TriggerResult.java | 96 +++++++++++++++++ .../windowing/EvictingWindowOperator.java | 5 +- .../windowing/NonKeyedWindowOperator.java | 34 ++++--- .../operators/windowing/WindowOperator.java | 18 ++-- 14 files changed, 179 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index bd82800..e2df160 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.util.ArrayList; @@ -95,7 +96,7 @@ public class SessionWindowing { env.execute(); } - private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> { + private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index d3eb2ac..a4d92cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import java.util.Collection; @@ -67,15 +68,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { /** * A trigger that never fires, as default Trigger for GlobalWindows. */ - private static class NeverTrigger implements Trigger<Object, GlobalWindow> { + private static class NeverTrigger extends Trigger<Object, GlobalWindow> { private static final long serialVersionUID = 1L; @Override - public TriggerResult onElement(Object element, - long timestamp, - GlobalWindow window, - TriggerContext ctx) { - return TriggerResult.CONTINUE; + public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { + return TriggerResult.CONTINUE; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 02a935c..09f3959 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import com.google.common.annotations.VisibleForTesting; @@ -33,7 +34,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Object, W> { +public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long interval; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 25d9508..ca7ecb6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> { +public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long interval; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index 725cbf6..5113991 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -30,7 +30,7 @@ import java.io.IOException; * * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class CountTrigger<W extends Window> implements Trigger<Object, W> { +public class CountTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 55c719a..4a6cde3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.api.common.state.ValueState; @@ -33,7 +34,7 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { +public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private final DeltaFunction<T> deltaFunction; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index bbd0a01..17d04c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -25,7 +26,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; * * @see org.apache.flink.streaming.api.watermark.Watermark */ -public class EventTimeTrigger implements Trigger<Object, TimeWindow> { +public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() {} http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index 85d6749..ae0b0e5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; @@ -23,7 +24,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; * A {@link Trigger} that fires once the current system time passes the end of the window * to which a pane belongs. */ -public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { +public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() {} http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index 626906c..0ec236b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -25,12 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window; * * <p> * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE} - * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult} + * {@link TriggerResult} * * @param <T> The type of elements on which this trigger can operate. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ -public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { +public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private Trigger<T, W> nestedTrigger; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index fb61064..a200e5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.windowing.triggers; import org.apache.flink.api.common.state.State; @@ -39,12 +40,14 @@ import java.io.Serializable; * <p> * Triggers must not maintain state internally since they can be re-created or reused for * different keys. All necessary state should be persisted using the state abstraction - * available on the {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext}. + * available on the {@link TriggerContext}. * * @param <T> The type of elements on which this {@code Trigger} works. * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate. */ -public interface Trigger<T, W extends Window> extends Serializable { +public abstract class Trigger<T, W extends Window> implements Serializable { + + private static final long serialVersionUID = -4104633972991191369L; /** * Called for every element that gets added to a pane. The result of this will determine @@ -55,7 +58,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * @param window The window to which this pane belongs. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; + public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; /** * Called when a processing-time timer that was set using the trigger context fires. @@ -63,7 +66,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; + public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; /** * Called when an event-time timer that was set using the trigger context fires. @@ -71,102 +74,53 @@ public interface Trigger<T, W extends Window> extends Serializable { * @param time The timestamp at which the timer fired. * @param ctx A context object that can be used to register timer callbacks. */ - TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; + public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; /** * Clears any state that the trigger might still hold for the given window. This is called * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. + * + * <p>By default, this method does nothing. */ - void clear(W window, TriggerContext ctx) throws Exception; + public void clear(W window, TriggerContext ctx) throws Exception {} + // ------------------------------------------------------------------------ + /** - * Result type for trigger methods. This determines what happens with the window. - * - * <p> - * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window - * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane - * are purged. On {@code CONTINUE} nothing happens, processing continues. On {@code PURGE} - * the contents of the window are discarded and no result is emitted for the window. - */ - enum TriggerResult { - CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true); - - private final boolean fire; - private final boolean purge; - - TriggerResult(boolean fire, boolean purge) { - this.purge = purge; - this.fire = fire; - } - - public boolean isFire() { - return fire; - } - - public boolean isPurge() { - return purge; - } - - /** - * Merges two {@code TriggerResults}. This specifies what should happen if we have - * two results from a Trigger, for example as a result from - * {@link #onElement(Object, long, Window, TriggerContext)} and - * {@link #onEventTime(long, Window, TriggerContext)}. - * - * <p> - * For example, if one result says {@code CONTINUE} while the other says {@code FIRE} - * then {@code FIRE} is the combined result; - */ - public static TriggerResult merge(TriggerResult a, TriggerResult b) { - if (a.purge || b.purge) { - if (a.fire || b.fire) { - return FIRE_AND_PURGE; - } else { - return PURGE; - } - } else if (a.fire || b.fire) { - return FIRE; - } else { - return CONTINUE; - } - } - } - - /** - * A context object that is given to {@code Trigger} methods to allow them to register timer + * A context object that is given to {@link Trigger} methods to allow them to register timer * callbacks and deal with state. */ - interface TriggerContext { - + public interface TriggerContext { + /** * Register a system time callback. When the current system time passes the specified - * time {@link #onProcessingTime(long, Window, TriggerContext)} is called with the time specified here. + * time {@link Trigger#onProcessingTime(long, Window, TriggerContext)} is called with the time specified here. * - * @param time The time at which to invoke {@link #onProcessingTime(long, Window, TriggerContext)} + * @param time The time at which to invoke {@link Trigger#onProcessingTime(long, Window, TriggerContext)} */ void registerProcessingTimeTimer(long time); - + /** * Register an event-time callback. When the current watermark passes the specified - * time {@link #onEventTime(long, Window, TriggerContext)} is called with the time specified here. + * time {@link Trigger#onEventTime(long, Window, TriggerContext)} is called with the time specified here. * - * @param time The watermark at which to invoke {@link #onEventTime(long, Window, TriggerContext)} + * @param time The watermark at which to invoke {@link Trigger#onEventTime(long, Window, TriggerContext)} * @see org.apache.flink.streaming.api.watermark.Watermark */ void registerEventTimeTimer(long time); - + /** * Delete the processing time trigger for the given time. */ void deleteProcessingTimeTimer(long time); - + /** * Delete the event-time trigger for the given time. */ void deleteEventTimeTimer(long time); - + /** * Retrieves an {@link State} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current @@ -180,7 +134,7 @@ public interface Trigger<T, W extends Window> extends Serializable { * function (function is not part os a KeyedStream). */ <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); - + /** * Retrieves a {@link ValueState} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current @@ -199,8 +153,8 @@ public interface Trigger<T, W extends Window> extends Serializable { */ @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); - - + + /** * Retrieves a {@link ValueState} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java new file mode 100644 index 0000000..2841542 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.windowing.triggers; + +import org.apache.flink.streaming.api.windowing.windows.Window; + +/** + * Result type for trigger methods. This determines what happens with the window, + * for example whether the window function should be called, or the window + * should be discarded. + */ +public enum TriggerResult { + + /** + * No action is taken on the window. + */ + CONTINUE(false, false), + + /** + * {@code FIRE_AND_PURGE} evaluates the window function and emits the window + * result. + */ + FIRE_AND_PURGE(true, true), + + /** + * On {@code FIRE}, the window is evaluated and results are emitted. + * The window is not purged, though, all elements are retained. + */ + FIRE(true, false), + + /** + * All elements in the window are cleared and the window is discarded, + * without evaluating the window function or emitting any elements. + */ + PURGE(false, true); + + // ------------------------------------------------------------------------ + + private final boolean fire; + private final boolean purge; + + TriggerResult(boolean fire, boolean purge) { + this.purge = purge; + this.fire = fire; + } + + public boolean isFire() { + return fire; + } + + public boolean isPurge() { + return purge; + } + + // ------------------------------------------------------------------------ + + /** + * Merges two {@code TriggerResults}. This specifies what should happen if we have + * two results from a Trigger, for example as a result from + * {@link Trigger#onElement(Object, long, Window, Trigger.TriggerContext)} and + * {@link Trigger#onEventTime(long, Window, Trigger.TriggerContext)}. + * + * <p> + * For example, if one result says {@code CONTINUE} while the other says {@code FIRE} + * then {@code FIRE} is the combined result; + */ + public static TriggerResult merge(TriggerResult a, TriggerResult b) { + if (a.purge || b.purge) { + if (a.fire || b.fire) { + return FIRE_AND_PURGE; + } else { + return PURGE; + } + } else if (a.fire || b.fire) { + return FIRE; + } else { + return CONTINUE; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 41ec91a..a960ac4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -87,7 +88,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window context.key = key; context.window = window; - Trigger.TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = context.onElement(element); processTriggerResult(triggerResult, key, window); } @@ -95,7 +96,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override @SuppressWarnings("unchecked,rawtypes") - protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { + protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception { if (!triggerResult.isFire() && !triggerResult.isPurge()) { // do nothing return; http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index d7dbaf5..93761e6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -38,6 +38,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer; @@ -248,7 +250,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> windows.put(window, context); } context.windowBuffer.storeElement(element); - Trigger.TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = context.onElement(element); processTriggerResult(triggerResult, window); } } @@ -264,7 +266,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> } } - private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception { + private void processTriggerResult(TriggerResult triggerResult, W window) throws Exception { if (!triggerResult.isFire() && !triggerResult.isPurge()) { // do nothing return; @@ -311,7 +313,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> // We have to check here whether the entry in the set still reflects the // currently set timer in the Context. if (ctx.watermarkTimer <= mark.getTimestamp()) { - Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer); + TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer); processTriggerResult(triggerResult, ctx.window); } } @@ -343,7 +345,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> // performance reasons. We have to check here whether the entry in the set still // reflects the currently set timer in the Context. if (ctx.processingTimeTimer <= time) { - Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer); + TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer); processTriggerResult(triggerResult, ctx.window); } } @@ -360,7 +362,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all * have their own instance of the {@code Trigger}. */ - protected class Context implements Trigger.TriggerContext { + protected class Context implements TriggerContext { protected W window; protected WindowBuffer<IN> windowBuffer; @@ -538,41 +540,41 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> } - public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { - Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this); + public TriggerResult onElement(StreamRecord<IN> element) throws Exception { + TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this); if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { // fire now and don't wait for the next watermark update - Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer); - return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult); + TriggerResult onEventTimeResult = onEventTime(watermarkTimer); + return TriggerResult.merge(onElementResult, onEventTimeResult); } else { return onElementResult; } } - public Trigger.TriggerResult onProcessingTime(long time) throws Exception { + public TriggerResult onProcessingTime(long time) throws Exception { if (time == processingTimeTimer) { processingTimeTimer = -1; return trigger.onProcessingTime(time, window, this); } else { - return Trigger.TriggerResult.CONTINUE; + return TriggerResult.CONTINUE; } } - public Trigger.TriggerResult onEventTime(long time) throws Exception { + public TriggerResult onEventTime(long time) throws Exception { if (time == watermarkTimer) { watermarkTimer = -1; - Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this); + TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this); if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { // fire now and don't wait for the next watermark update - Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer); - return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult); + TriggerResult secondTriggerResult = onEventTime(watermarkTimer); + return TriggerResult.merge(firstTriggerResult, secondTriggerResult); } else { return firstTriggerResult; } } else { - return Trigger.TriggerResult.CONTINUE; + return TriggerResult.CONTINUE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/50bd65a5/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index eccaeee..5fc89e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -40,6 +40,8 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory; @@ -243,13 +245,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = key; context.window = window; - Trigger.TriggerResult triggerResult = context.onElement(element); + TriggerResult triggerResult = context.onElement(element); processTriggerResult(triggerResult, key, window); } } - protected void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception { + protected void processTriggerResult(TriggerResult triggerResult, K key, W window) throws Exception { if (!triggerResult.isFire() && !triggerResult.isPurge()) { // do nothing return; @@ -293,7 +295,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); - Trigger.TriggerResult triggerResult = context.onEventTime(timer.timestamp); + TriggerResult triggerResult = context.onEventTime(timer.timestamp); processTriggerResult(triggerResult, context.key, context.window); } else { fire = false; @@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); - Trigger.TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); processTriggerResult(triggerResult, context.key, context.window); } else { fire = false; @@ -338,7 +340,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * by setting the {@code key} and {@code window} fields. No internal state must be kept in * the {@code Context} */ - protected class Context implements Trigger.TriggerContext { + protected class Context implements TriggerContext { protected K key; protected W window; @@ -427,15 +429,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } - public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { + public TriggerResult onElement(StreamRecord<IN> element) throws Exception { return trigger.onElement(element.getValue(), element.getTimestamp(), window, this); } - public Trigger.TriggerResult onProcessingTime(long time) throws Exception { + public TriggerResult onProcessingTime(long time) throws Exception { return trigger.onProcessingTime(time, window, this); } - public Trigger.TriggerResult onEventTime(long time) throws Exception { + public TriggerResult onEventTime(long time) throws Exception { return trigger.onEventTime(time, window, this); }