[FLINK-3200] Fix Triggers by introducing clear() method to clean up state/triggers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/456d0aba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/456d0aba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/456d0aba Branch: refs/heads/master Commit: 456d0abaf7722ab16d91c5e2b52d80c076513921 Parents: e4d05f7 Author: Aljoscha Krettek <[email protected]> Authored: Tue Feb 2 18:03:06 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Feb 3 20:28:32 2016 +0100 ---------------------------------------------------------------------- .../examples/windowing/SessionWindowing.java | 19 +++- .../api/windowing/assigners/GlobalWindows.java | 3 + .../triggers/ContinuousEventTimeTrigger.java | 3 + .../ContinuousProcessingTimeTrigger.java | 3 + .../api/windowing/triggers/CountTrigger.java | 5 + .../api/windowing/triggers/DeltaTrigger.java | 5 + .../windowing/triggers/EventTimeTrigger.java | 5 + .../triggers/ProcessingTimeTrigger.java | 5 + .../api/windowing/triggers/PurgingTrigger.java | 5 + .../api/windowing/triggers/Trigger.java | 17 ++++ .../windowing/NonKeyedWindowOperator.java | 25 +++++ .../operators/windowing/WindowOperator.java | 100 ++++++++++++------- 12 files changed, 157 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java index 69f61bc..dafe86f 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java @@ -45,7 +45,7 @@ public class SessionWindowing { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(2); + env.setParallelism(1); final List<Tuple3<String, Long, Integer>> input = new ArrayList<>(); @@ -103,7 +103,7 @@ public class SessionWindowing { private final Long sessionTimeout; - private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", 1L, + private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("last-seen", -1L, BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig())); @@ -120,12 +120,15 @@ public class SessionWindowing { Long timeSinceLastEvent = timestamp - lastSeen; + ctx.deleteEventTimeTimer(lastSeen + sessionTimeout); + // Update the last seen event time lastSeenState.update(timestamp); ctx.registerEventTimeTimer(timestamp + sessionTimeout); - if (timeSinceLastEvent > sessionTimeout) { + if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) { + System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen); return TriggerResult.FIRE_AND_PURGE; } else { return TriggerResult.CONTINUE; @@ -138,6 +141,7 @@ public class SessionWindowing { Long lastSeen = lastSeenState.value(); if (time - lastSeen >= sessionTimeout) { + System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen); return TriggerResult.FIRE_AND_PURGE; } return TriggerResult.CONTINUE; @@ -147,6 +151,15 @@ public class SessionWindowing { public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception { + ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc); + if (lastSeenState.value() != -1) { + ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout); + } + lastSeenState.clear(); + } } // ************************************************************************* http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 99a4962..d3eb2ac 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -87,6 +87,9 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } + + @Override + public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index 17818af..21e35db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -74,6 +74,9 @@ public class ContinuousEventTimeTrigger<W extends Window> implements Trigger<Obj } @Override + public void clear(W window, TriggerContext ctx) throws Exception {} + + @Override public String toString() { return "ContinuousProcessingTimeTrigger(" + interval + ")"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java index 20a2274..10c975f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java @@ -90,6 +90,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge return TriggerResult.CONTINUE; } + @Override + public void clear(W window, TriggerContext ctx) throws Exception {} + @VisibleForTesting public long getInterval() { return interval; http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java index e8742d5..3fcfb46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java @@ -66,6 +66,11 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> { } @Override + public void clear(W window, TriggerContext ctx) throws Exception { + ctx.getPartitionedState(stateDesc).clear(); + } + + @Override public String toString() { return "CountTrigger(" + maxCount + ")"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java index 60ada88..3135961 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java @@ -72,6 +72,11 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> { } @Override + public void clear(W window, TriggerContext ctx) throws Exception { + ctx.getPartitionedState(stateDesc).clear(); + } + + @Override public String toString() { return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index 831e360..bbd0a01 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -47,6 +47,11 @@ public class EventTimeTrigger implements Trigger<Object, TimeWindow> { } @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ctx.deleteEventTimeTimer(window.maxTimestamp()); + } + + @Override public String toString() { return "EventTimeTrigger()"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java index b460c8a..85d6749 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java @@ -45,6 +45,11 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> { } @Override + public void clear(TimeWindow window, TriggerContext ctx) throws Exception { + ctx.deleteProcessingTimeTimer(window.maxTimestamp()); + } + + @Override public String toString() { return "ProcessingTimeTrigger()"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java index cc20296..626906c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java @@ -79,6 +79,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> { } @Override + public void clear(W window, TriggerContext ctx) throws Exception { + nestedTrigger.clear(window, ctx); + } + + @Override public String toString() { return "PurgingTrigger(" + nestedTrigger.toString() + ")"; } http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java index 8ea50b3..5c71355 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java @@ -73,6 +73,13 @@ public interface Trigger<T, W extends Window> extends Serializable { */ TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; + /** + * Clears any state that the trigger might still hold for the given window. This is called + * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)} + * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as + * well as state acquired using {@link TriggerContext#getPartitionedState(StateDescriptor)}. + */ + void clear(W window, TriggerContext ctx) throws Exception; /** * Result type for trigger methods. This determines what happens with the window. @@ -151,6 +158,16 @@ public interface Trigger<T, W extends Window> extends Serializable { void registerEventTimeTimer(long time); /** + * Delete the processing time trigger for the given time. + */ + void deleteProcessingTimeTimer(long time); + + /** + * Delete the event-time trigger for the given time. + */ + void deleteEventTimeTimer(long time); + + /** * Retrieves an {@link State} object that can be used to interact with * fault-tolerant state that is scoped to the window and key of the current * trigger invocation. http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java index d9fa9f0..1b712d9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java @@ -284,6 +284,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> if (triggerResult.isFire()) { emitWindow(context); } + + if (triggerResult.isPurge()) { + context.clear(); + } } @Override @@ -516,6 +520,23 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> triggers.add(this); } + @Override + public void deleteProcessingTimeTimer(long time) { + Set<Context> triggers = processingTimeTimers.get(time); + if (triggers != null) { + triggers.remove(this); + } + } + + @Override + public void deleteEventTimeTimer(long time) { + Set<Context> triggers = watermarkTimers.get(time); + if (triggers != null) { + triggers.remove(this); + } + + } + public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this); if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) { @@ -553,6 +574,10 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window> return Trigger.TriggerResult.CONTINUE; } } + + public void clear() throws Exception { + trigger.clear(window, this); + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/456d0aba/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 5109dae..d562925 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -1,20 +1,20 @@ /** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; @@ -85,8 +85,8 @@ import static java.util.Objects.requireNonNull; * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ public class WindowOperator<K, IN, ACC, OUT, W extends Window> - extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>> - implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { + extends AbstractUdfStreamOperator<OUT, WindowFunction<ACC, OUT, K, W>> + implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -164,12 +164,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * Creates a new {@code WindowOperator} based on the given policies and user functions. */ public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, - TypeSerializer<W> windowSerializer, - KeySelector<IN, K> keySelector, - TypeSerializer<K> keySerializer, - StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor, - WindowFunction<ACC, OUT, K, W> windowFunction, - Trigger<? super IN, ? super W> trigger) { + TypeSerializer<W> windowSerializer, + KeySelector<IN, K> keySelector, + TypeSerializer<K> keySerializer, + StateDescriptor<? extends MergingState<IN, ACC>> windowStateDescriptor, + WindowFunction<ACC, OUT, K, W> windowFunction, + Trigger<? super IN, ? super W> trigger) { super(windowFunction); @@ -258,8 +258,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (triggerResult.isFire()) { timestampedCollector.setTimestamp(window.maxTimestamp()); - setKeyContext(key); - MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor); @@ -269,12 +267,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (triggerResult.isPurge()) { windowState.clear(); + context.clear(); } } else if (triggerResult.isPurge()) { - setKeyContext(key); MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer, windowStateDescriptor); windowState.clear(); + context.clear(); } } @@ -293,7 +292,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.key; context.window = timer.window; - Trigger.TriggerResult triggerResult = context.onEventTime(mark.getTimestamp()); + setKeyContext(timer.key); + Trigger.TriggerResult triggerResult = context.onEventTime(timer.timestamp); processTriggerResult(triggerResult, context.key, context.window); } else { fire = false; @@ -319,7 +319,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> context.key = timer.key; context.window = timer.window; - Trigger.TriggerResult triggerResult = context.onProcessingTime(time); + setKeyContext(timer.key); + Trigger.TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); processTriggerResult(triggerResult, context.key, context.window); } else { fire = false; @@ -410,6 +411,23 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } } + @Override + public void deleteProcessingTimeTimer(long time) { + Timer<K, W> timer = new Timer<>(time, key, window); + if (processingTimeTimers.remove(timer)) { + processingTimeTimersQueue.remove(timer); + } + } + + @Override + public void deleteEventTimeTimer(long time) { + Timer<K, W> timer = new Timer<>(time, key, window); + if (watermarkTimers.remove(timer)) { + watermarkTimersQueue.remove(timer); + } + + } + public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception { return trigger.onElement(element.getValue(), element.getTimestamp(), window, this); } @@ -421,6 +439,18 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public Trigger.TriggerResult onEventTime(long time) throws Exception { return trigger.onEventTime(time, window, this); } + + public void clear() throws Exception { + trigger.clear(window, this); + } + + @Override + public String toString() { + return "Context{" + + "key=" + key + + ", window=" + window + + '}'; + } } /** @@ -454,8 +484,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> Timer<?, ?> timer = (Timer<?, ?>) o; return timestamp == timer.timestamp - && key.equals(timer.key) - && window.equals(timer.window); + && key.equals(timer.key) + && window.equals(timer.window); } @@ -470,10 +500,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public String toString() { return "Timer{" + - "timestamp=" + timestamp + - ", key=" + key + - ", window=" + window + - '}'; + "timestamp=" + timestamp + + ", key=" + key + + ", window=" + window + + '}'; } }
