[FLINK-4749] [streaming api] Remove redundant processing time timer sets from window operator
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47e49774 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47e49774 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47e49774 Branch: refs/heads/master Commit: 47e4977442f7045baf29ce7dc772b7f7aff65343 Parents: 9d24d51 Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 23:48:31 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 20:04:34 2016 +0200 ---------------------------------------------------------------------- .../windowing/EvictingWindowOperator.java | 79 +++++------ .../operators/windowing/WindowOperator.java | 140 +++++++++---------- .../runtime/tasks/TestTimeServiceProvider.java | 72 +++++++++- .../operators/windowing/WindowOperatorTest.java | 24 ++-- 4 files changed, 175 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index a838faa..6609e4d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -262,60 +262,53 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override public void trigger(long time) throws Exception { - boolean fire; + Timer<K, W> timer; - //Remove information about the triggering task - processingTimeTimerFutures.remove(time); - processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); + while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) { - do { - Timer<K, W> timer = processingTimeTimersQueue.peek(); - if (timer != null && timer.timestamp <= time) { - fire = true; + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(); - processingTimeTimers.remove(timer); - processingTimeTimersQueue.remove(); + context.key = timer.key; + context.window = timer.window; + setKeyContext(timer.key); - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); - - ListState<StreamRecord<IN>> windowState; - MergingWindowSet<W> mergingWindows = null; - - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } + ListState<StreamRecord<IN>> windowState; + MergingWindowSet<W> mergingWindows = null; - Iterable<StreamRecord<IN>> contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore continue; } + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + } - TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - } else { - fire = false; + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } - } while (fire); + } + + if (timer != null) { + nextTimer = getTimerService().registerTimer(timer.timestamp, this); + } } private void fire(W window, Iterable<StreamRecord<IN>> contents) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index f010822..de316e7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -18,8 +18,6 @@ package org.apache.flink.streaming.runtime.operators.windowing; -import com.google.common.collect.HashMultiset; -import com.google.common.collect.Multiset; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; @@ -151,7 +149,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected transient TimestampedCollector<OUT> timestampedCollector; - protected transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; + protected transient ScheduledFuture<?> nextTimer; /** * To keep track of the current watermark so that we can immediately fire if a trigger @@ -172,7 +170,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; protected transient Set<Timer<K, W>> processingTimeTimers; - protected transient Multiset<Long> processingTimeTimerTimestamps; /** * Current waiting watermark callbacks. @@ -219,7 +216,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> inputSerializer = (TypeSerializer<IN>) type.createSerializer(executionConfig); } - @SuppressWarnings("unchecked") @Override public final void open() throws Exception { super.open(); @@ -237,13 +233,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } if (processingTimeTimers == null) { processingTimeTimers = new HashSet<>(); - processingTimeTimerTimestamps = HashMultiset.create(); processingTimeTimersQueue = new PriorityQueue<>(100); } - //ScheduledFutures are not checkpointed - processingTimeTimerFutures = new HashMap<>(); - context = new Context(null, null); windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @@ -261,6 +253,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public final void close() throws Exception { super.close(); + + if (nextTimer != null) { + nextTimer.cancel(false); + nextTimer = null; + } + timestampedCollector = null; watermarkTimers = null; watermarkTimersQueue = null; @@ -274,6 +272,12 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void dispose() throws Exception { super.dispose(); + + if (nextTimer != null) { + nextTimer.cancel(false); + nextTimer = null; + } + timestampedCollector = null; watermarkTimers = null; watermarkTimersQueue = null; @@ -459,60 +463,53 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void trigger(long time) throws Exception { - boolean fire; - - //Remove information about the triggering task - processingTimeTimerFutures.remove(time); - processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); + Timer<K, W> timer; - do { - Timer<K, W> timer = processingTimeTimersQueue.peek(); - if (timer != null && timer.timestamp <= time) { - fire = true; + while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) { - processingTimeTimers.remove(timer); - processingTimeTimersQueue.remove(); + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(); - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); + context.key = timer.key; + context.window = timer.window; + setKeyContext(timer.key); - AppendingState<IN, ACC> windowState; - MergingWindowSet<W> mergingWindows = null; - - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } + AppendingState<IN, ACC> windowState; + MergingWindowSet<W> mergingWindows = null; - ACC contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore continue; } + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + } - TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + continue; + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - } else { - fire = false; + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { + cleanup(context.window, windowState, mergingWindows); } - } while (fire); + } + + if (timer != null) { + nextTimer = getTimerService().registerTimer(timer.timestamp, this); + } } /** @@ -719,14 +716,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void registerProcessingTimeTimer(long time) { Timer<K, W> timer = new Timer<>(time, key, window); + // make sure we only put one timer per key into the queue if (processingTimeTimers.add(timer)) { + + Timer<K, W> oldHead = processingTimeTimersQueue.peek(); + long nextTriggerTime = oldHead != null ? oldHead.timestamp : Long.MAX_VALUE; + processingTimeTimersQueue.add(timer); - //If this is the first timer added for this timestamp register a TriggerTask - if (processingTimeTimerTimestamps.add(time, 1) == 0) { - ScheduledFuture<?> scheduledFuture = WindowOperator.this.getTimerService() - .registerTimer(time, WindowOperator.this); - processingTimeTimerFutures.put(time, scheduledFuture); + + // check if we need to re-schedule our timer to earlier + if (time < nextTriggerTime) { + if (nextTimer != null) { + nextTimer.cancel(false); + } + nextTimer = getTimerService().registerTimer(time, WindowOperator.this); } } } @@ -746,14 +750,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> if (processingTimeTimers.remove(timer)) { processingTimeTimersQueue.remove(timer); } - - //If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask - if (processingTimeTimerTimestamps.remove(time,1) == 1) { - ScheduledFuture<?> triggerTaskFuture = processingTimeTimerFutures.remove(timer.timestamp); - if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) { - triggerTaskFuture.cancel(false); - } - } } @Override @@ -904,12 +900,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> processingTimeTimers.add(timer); } - int numProcessingTimeTimerTimestamp = in.readInt(); - processingTimeTimerTimestamps = HashMultiset.create(); - for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) { - long timestamp = in.readLong(); - int count = in.readInt(); - processingTimeTimerTimestamps.add(timestamp, count); + if (numProcessingTimeTimers > 0) { + nextTimer = getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, this); } } @@ -927,12 +919,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> windowSerializer.serialize(timer.window, out); out.writeLong(timer.timestamp); } - - out.writeInt(processingTimeTimerTimestamps.entrySet().size()); - for (Multiset.Entry<Long> timerTimestampCounts: processingTimeTimerTimestamps.entrySet()) { - out.writeLong(timerTimestampCounts.getElement()); - out.writeInt(timerTimestampCounts.getCount()); - } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java index 81faec9..f4bead9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java @@ -24,7 +24,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the @@ -32,13 +36,14 @@ import java.util.concurrent.ScheduledFuture; * */ public class TestTimeServiceProvider extends TimeServiceProvider { - private long currentTime = 0; + private volatile long currentTime = 0; - private boolean isTerminated = false; + private volatile boolean isTerminated; // sorts the timers by timestamp so that they are processed in the correct order. - private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); + private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); + public void setCurrentTime(long timestamp) throws Exception { this.currentTime = timestamp; @@ -72,13 +77,26 @@ public class TestTimeServiceProvider extends TimeServiceProvider { @Override public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) { + if (isTerminated) { + throw new IllegalStateException("terminated"); + } + + if (timestamp <= currentTime) { + try { + target.trigger(timestamp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + List<Triggerable> tasks = registeredTasks.get(timestamp); if (tasks == null) { tasks = new ArrayList<>(); registeredTasks.put(timestamp, tasks); } tasks.add(target); - return null; + + return new DummyFuture(); } @Override @@ -86,6 +104,11 @@ public class TestTimeServiceProvider extends TimeServiceProvider { return isTerminated; } + @Override + public void shutdownService() throws Exception { + isTerminated = true; + } + public int getNumRegisteredTimers() { int count = 0; for (List<Triggerable> tasks: registeredTasks.values()) { @@ -94,8 +117,43 @@ public class TestTimeServiceProvider extends TimeServiceProvider { return count; } - @Override - public void shutdownService() throws Exception { - this.isTerminated = true; + // ------------------------------------------------------------------------ + + private static class DummyFuture implements ScheduledFuture<Object> { + + @Override + public long getDelay(TimeUnit unit) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(Delayed o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return true; + } + + @Override + public boolean isCancelled() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDone() { + throw new UnsupportedOperationException(); + } + + @Override + public Object get() throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + @Override + public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/47e49774/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index fd73bcc..e98bc91 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -876,6 +876,9 @@ public class WindowOperatorTest extends TestLogger { final int WINDOW_SLIDE = 1; TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>"); + TestTimeServiceProvider timer = new TestTimeServiceProvider(); + + TestTimeServiceProvider timer = new TestTimeServiceProvider(); ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = new ReducingStateDescriptor<>("window-contents", new SumReducer(), @@ -893,9 +896,10 @@ public class WindowOperatorTest extends TestLogger { OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = - new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); ; - - operator.setInputType(inputType, new ExecutionConfig()); + new KeyedOneInputStreamOperatorTestHarness<>( + operator, new ExecutionConfig(), timer, + new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + testHarness.open(); WindowOperator.Timer<String, TimeWindow> timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L)); @@ -907,12 +911,7 @@ public class WindowOperatorTest extends TestLogger { operator.processingTimeTimersQueue.add(timer1); operator.processingTimeTimersQueue.add(timer2); operator.processingTimeTimersQueue.add(timer3); - - operator.processingTimeTimerTimestamps.add(1L, 10); - operator.processingTimeTimerTimestamps.add(2L, 5); - operator.processingTimeTimerTimestamps.add(3L, 1); - - + StreamStateHandle snapshot = testHarness.snapshot(0, 0); WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> otherOperator = new WindowOperator<>( @@ -926,9 +925,9 @@ public class WindowOperatorTest extends TestLogger { 0); OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> otherTestHarness = - new KeyedOneInputStreamOperatorTestHarness<>(otherOperator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); - - otherOperator.setInputType(inputType, new ExecutionConfig()); + new KeyedOneInputStreamOperatorTestHarness<>( + otherOperator, new ExecutionConfig(), timer, + new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); otherTestHarness.setup(); otherTestHarness.restore(snapshot); @@ -936,7 +935,6 @@ public class WindowOperatorTest extends TestLogger { Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers); Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray()); - Assert.assertEquals(operator.processingTimeTimerTimestamps, otherOperator.processingTimeTimerTimestamps); } @Test
