[FLINK-4994] Don't Clear Trigger State and Merging Window Set When Purging Before, when a Trigger returns TriggerResult.PURGE from any of the on*() methods the WindowOperator will clear all state of that window (window contents, merging window set) and call Trigger.clear() so that the Trigger can clean up its state/timers.
This was problematic in some cases. For example, with merging windows (session windows) this means that a late-arriving element will not be put into the session that was previously built up but will be put into a completely new session that only contains this one element. The new behaviour is this: * Only clean window contents on PURGE * Register cleanup timer for any window, don't delete this on PURGE * When the cleanup timer fires: clean window state, clean merging window set, call Trigger.clear() to allow it to clean state/timers Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b331a42 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b331a42 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b331a42 Branch: refs/heads/master Commit: 0b331a421267a541d91e94f2713534704ed32bed Parents: bcca3fe Author: Aljoscha Krettek <[email protected]> Authored: Wed Nov 2 11:51:07 2016 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Jan 24 10:42:34 2017 +0100 ---------------------------------------------------------------------- .../windowing/EvictingWindowOperator.java | 119 +++++++------- .../operators/windowing/WindowOperator.java | 155 +++++++++++-------- .../operators/windowing/WindowOperatorTest.java | 11 +- 3 files changed, 158 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index d9c977a..45fea14 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -57,20 +57,20 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns. */ @Internal -public class EvictingWindowOperator<K, IN, OUT, W extends Window> +public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, Iterable<IN>, OUT, W> { private static final long serialVersionUID = 1L; // ------------------------------------------------------------------------ - // these fields are set by the API stream graph builder to configure the operator - + // these fields are set by the API stream graph builder to configure the operator + private final Evictor<? super IN, ? super W> evictor; private final StateDescriptor<? extends ListState<StreamRecord<IN>>, ?> evictingWindowStateDescriptor; // ------------------------------------------------------------------------ - // the fields below are instantiated once the operator runs in the runtime + // the fields below are instantiated once the operator runs in the runtime private transient EvictorContext evictorContext; @@ -146,7 +146,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set."); } - + evictingWindowState.setCurrentNamespace(stateWindow); evictingWindowState.add(element); @@ -163,14 +163,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> // if we have no state, there is nothing to do continue; } - fire(actualWindow, contents, evictingWindowState); + emitWindowContents(actualWindow, contents, evictingWindowState); } if (triggerResult.isPurge()) { - cleanup(actualWindow, evictingWindowState, mergingWindows); - } else { - registerCleanupTimer(actualWindow); + evictingWindowState.clear(); } + registerCleanupTimer(actualWindow); } mergingWindows.persist(); @@ -198,14 +197,13 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> // if we have no state, there is nothing to do continue; } - fire(window, contents, evictingWindowState); + emitWindowContents(window, contents, evictingWindowState); } if (triggerResult.isPurge()) { - cleanup(window, evictingWindowState, null); - } else { - registerCleanupTimer(window); + evictingWindowState.clear(); } + registerCleanupTimer(window); } } } @@ -218,37 +216,42 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictorContext.key = timer.getKey(); evictorContext.window = timer.getNamespace(); - ListState<StreamRecord<IN>> windowState; MergingWindowSet<W> mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore + // Timer firing for non-existent window, this can only happen if a + // trigger did not clean up timers. We have already cleared the merging + // window and therefore the Trigger state, however, so nothing to do. return; + } else { + evictingWindowState.setCurrentNamespace(stateWindow); } - - evictingWindowState.setCurrentNamespace(stateWindow); } else { evictingWindowState.setCurrentNamespace(context.window); } Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - return; + + if (contents != null) { + TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + emitWindowContents(context.window, contents, evictingWindowState); + } + if (triggerResult.isPurge()) { + evictingWindowState.clear(); + } } - TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents, evictingWindowState); + if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { + clearAllState(context.window, evictingWindowState, mergingWindows); } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, evictingWindowState, mergingWindows); + if (mergingWindows != null) { + // need to make sure to update the merging state in state + mergingWindows.persist(); } } @@ -259,40 +262,46 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> evictorContext.key = timer.getKey(); evictorContext.window = timer.getNamespace(); - ListState<StreamRecord<IN>> windowState; MergingWindowSet<W> mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore + // Timer firing for non-existent window, this can only happen if a + // trigger did not clean up timers. We have already cleared the merging + // window and therefore the Trigger state, however, so nothing to do. return; + } else { + evictingWindowState.setCurrentNamespace(stateWindow); } - evictingWindowState.setCurrentNamespace(stateWindow); } else { evictingWindowState.setCurrentNamespace(context.window); } Iterable<StreamRecord<IN>> contents = evictingWindowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - return; + + if (contents != null) { + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + emitWindowContents(context.window, contents, evictingWindowState); + } + if (triggerResult.isPurge()) { + evictingWindowState.clear(); + } } - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents, evictingWindowState); + if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { + clearAllState(context.window, evictingWindowState, mergingWindows); } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, evictingWindowState, mergingWindows); + if (mergingWindows != null) { + // need to make sure to update the merging state in state + mergingWindows.persist(); } } - private void fire(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { + private void emitWindowContents(W window, Iterable<StreamRecord<IN>> contents, ListState<StreamRecord<IN>> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... @@ -326,6 +335,18 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> } } + private void clearAllState( + W window, + ListState<StreamRecord<IN>> windowState, + MergingWindowSet<W> mergingWindows) throws Exception { + + windowState.clear(); + context.clear(); + if (mergingWindows != null) { + mergingWindows.retireWindow(window); + mergingWindows.persist(); + } + } /** * {@code EvictorContext} is a utility for handling {@code Evictor} invocations. It can be reused @@ -372,24 +393,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> } } - private void cleanup(W window, - ListState<StreamRecord<IN>> windowState, - MergingWindowSet<W> mergingWindows) throws Exception { - - windowState.clear(); - if (mergingWindows != null) { - mergingWindows.retireWindow(window); - mergingWindows.persist(); - } - context.clear(); - } - @Override public void open() throws Exception { super.open(); evictorContext = new EvictorContext(null,null); - evictingWindowState = (InternalListState<W, StreamRecord<IN>>) + evictingWindowState = (InternalListState<W, StreamRecord<IN>>) getOrCreateKeyedState(windowSerializer, evictingWindowStateDescriptor); } http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 0dbaffd..3c4f397 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -139,8 +139,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> /** The state in which the window contents is stored. Each window is a namespace */ private transient InternalAppendingState<W, IN, ACC> windowState; - /** The {@link #windowState}, typed to merging state for merging windows. - * Null if the window state is not mergeable */ + /** + * The {@link #windowState}, typed to merging state for merging windows. + * Null if the window state is not mergeable. + */ private transient InternalMergingState<W, IN, ACC> windowMergingState; /** The state that holds the merging window metadata (the sets that describe what is merged) */ @@ -292,7 +294,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> new ListStateDescriptor<>("merging-window-set", tupleSerializer); // get the state that stores the merging sets - mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>) + mergingSetsState = (InternalListState<VoidNamespace, Tuple2<W, W>>) getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor); mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE); } @@ -320,7 +322,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); - + final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { @@ -376,14 +378,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (contents == null) { continue; } - fire(actualWindow, contents); + emitWindowContents(actualWindow, contents); } if (triggerResult.isPurge()) { - cleanup(actualWindow, windowState, mergingWindows); - } else { - registerCleanupTimer(actualWindow); + windowState.clear(); } + registerCleanupTimer(actualWindow); } // need to make sure to update the merging state in state @@ -409,14 +410,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (contents == null) { continue; } - fire(window, contents); + emitWindowContents(window, contents); } if (triggerResult.isPurge()) { - cleanup(window, windowState, null); - } else { - registerCleanupTimer(window); + windowState.clear(); } + registerCleanupTimer(window); } } } @@ -432,31 +432,40 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore + // Timer firing for non-existent window, this can only happen if a + // trigger did not clean up timers. We have already cleared the merging + // window and therefore the Trigger state, however, so nothing to do. return; + } else { + windowState.setCurrentNamespace(stateWindow); } - - windowState.setCurrentNamespace(stateWindow); } else { windowState.setCurrentNamespace(context.window); mergingWindows = null; } - ACC contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - return; + ACC contents = null; + if (windowState != null) { + contents = windowState.get(); } - TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents); + if (contents != null) { + TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + emitWindowContents(context.window, contents); + } + if (triggerResult.isPurge()) { + windowState.clear(); + } } - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, windowState, mergingWindows); + if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { + clearAllState(context.window, windowState, mergingWindows); + } + + if (mergingWindows != null) { + // need to make sure to update the merging state in state + mergingWindows.persist(); } } @@ -471,55 +480,67 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore + // Timer firing for non-existent window, this can only happen if a + // trigger did not clean up timers. We have already cleared the merging + // window and therefore the Trigger state, however, so nothing to do. return; + } else { + windowState.setCurrentNamespace(stateWindow); } - windowState.setCurrentNamespace(stateWindow); } else { windowState.setCurrentNamespace(context.window); mergingWindows = null; } - ACC contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - return; + ACC contents = null; + if (windowState != null) { + contents = windowState.get(); + } + + if (contents != null) { + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + emitWindowContents(context.window, contents); + } + if (triggerResult.isPurge()) { + windowState.clear(); + } } - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents); + if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) { + clearAllState(context.window, windowState, mergingWindows); } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { - cleanup(context.window, windowState, mergingWindows); + if (mergingWindows != null) { + // need to make sure to update the merging state in state + mergingWindows.persist(); } } /** - * Cleans up the window state if the provided {@link TriggerResult} requires so, or if it - * is time to do so (see {@link #isCleanupTime(Window, long)}). The caller must ensure that the + * Drops all state for the given window and calls + * {@link Trigger#clear(Window, Trigger.TriggerContext)}. + * + * <p>The caller must ensure that the * correct key is set in the state backend and the context object. */ - private void cleanup(W window, - AppendingState<IN, ACC> windowState, - MergingWindowSet<W> mergingWindows) throws Exception { + private void clearAllState( + W window, + AppendingState<IN, ACC> windowState, + MergingWindowSet<W> mergingWindows) throws Exception { windowState.clear(); + context.clear(); if (mergingWindows != null) { mergingWindows.retireWindow(window); mergingWindows.persist(); } - context.clear(); } /** - * Triggers the window computation if the provided {@link TriggerResult} requires so. - * The caller must ensure that the correct key is set in the state backend and the context object. + * Emits the contents of the given window using the {@link InternalWindowFunction}. */ @SuppressWarnings("unchecked") - private void fire(W window, ACC contents) throws Exception { + private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); userFunction.apply(context.key, context.window, contents, timestampedCollector); } @@ -538,12 +559,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } /** - * Decides if a window is currently late or not, based on the current - * watermark, i.e. the current event time, and the allowed lateness. - * @param window - * The collection of windows returned by the {@link WindowAssigner}. - * @return The windows (among the {@code eligibleWindows}) for which the element should still be - * considered when triggering. + * Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness + * of the given window. */ protected boolean isLate(W window) { return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark())); @@ -556,6 +573,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected void registerCleanupTimer(W window) { long cleanupTime = cleanupTime(window); + if (cleanupTime == Long.MAX_VALUE) { + // don't set a GC timer for "end of time" + return; + } + if (windowAssigner.isEventTime()) { context.registerEventTimeTimer(cleanupTime); } else { @@ -570,6 +592,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected void deleteCleanupTimer(W window) { long cleanupTime = cleanupTime(window); + if (cleanupTime == Long.MAX_VALUE) { + // no need to clean up because we didn't set one + return; + } if (windowAssigner.isEventTime()) { context.deleteEventTimeTimer(cleanupTime); } else { @@ -587,24 +613,19 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * @param window the window whose cleanup time we are computing. */ private long cleanupTime(W window) { - long cleanupTime = window.maxTimestamp() + allowedLateness; - return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; + if (windowAssigner.isEventTime()) { + long cleanupTime = window.maxTimestamp() + allowedLateness; + return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE; + } else { + return window.maxTimestamp(); + } } /** - * Decides if it is time to clean up the window state. - * Clean up time for a window is: - * <li> if it is event time, after the watermark passes the end of the window plus the user-specified allowed lateness - * <li> if it is processing time, after the processing time at the node passes the end of the window. - * @param window - * the window to clean - * @param time - * the current time (event or processing depending on the {@link WindowAssigner} - * @return {@code true} if it is time to clean up the window state, {@code false} otherwise. + * Returns {@code true} if the given time is the cleanup time for the given window. */ protected final boolean isCleanupTime(W window, long time) { - long cleanupTime = cleanupTime(window); - return cleanupTime == time; + return time == cleanupTime(window); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0b331a42/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 2faa506..6238e6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -1601,20 +1601,21 @@ public class WindowOperatorTest extends TestLogger { expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599)); expected.add(new Watermark(14600)); - // dropped as late testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000)); + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 14600L), 14599)); + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500)); testHarness.processWatermark(new Watermark(20000)); - expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499)); + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 17500L), 17499)); expected.add(new Watermark(20000)); testHarness.processWatermark(new Watermark(100000)); expected.add(new Watermark(100000)); ConcurrentLinkedQueue<Object> actual = testHarness.getOutput(); - TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator()); + TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator()); testHarness.close(); } @@ -1780,7 +1781,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000)); - expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 10000L, 13000L), 12999)); + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599)); ConcurrentLinkedQueue<Object> actual = testHarness.getOutput(); TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator()); @@ -1788,7 +1789,7 @@ public class WindowOperatorTest extends TestLogger { testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500)); testHarness.processWatermark(new Watermark(20000)); - expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 14500L, 17500L), 17499)); + expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 17500L), 17499)); expected.add(new Watermark(20000)); testHarness.processWatermark(new Watermark(100000));
