http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java index 448f95f..00d4722 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @Internal @@ -31,12 +30,6 @@ public class CoStreamMap<IN1, IN2, OUT> private static final long serialVersionUID = 1L; - // We keep track of watermarks from both inputs, the combined input is the minimum - // Once the minimum advances we emit a new watermark for downstream operators - private long combinedWatermark = Long.MIN_VALUE; - private long input1Watermark = Long.MIN_VALUE; - private long input2Watermark = Long.MIN_VALUE; - public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) { super(mapper); } @@ -50,24 +43,4 @@ public class CoStreamMap<IN1, IN2, OUT> public void processElement2(StreamRecord<IN2> element) throws Exception { output.collect(element.replace(userFunction.map2(element.getValue()))); } - - @Override - public void processWatermark1(Watermark mark) throws Exception { - input1Watermark = mark.getTimestamp(); - long newMin = Math.min(input1Watermark, input2Watermark); - if (newMin > combinedWatermark) { - combinedWatermark = newMin; - output.emitWatermark(new Watermark(combinedWatermark)); - } - } - - @Override - public void processWatermark2(Watermark mark) throws Exception { - input2Watermark = mark.getTimestamp(); - long newMin = Math.min(input1Watermark, input2Watermark); - if (newMin > combinedWatermark) { - combinedWatermark = newMin; - output.emitWatermark(new Watermark(combinedWatermark)); - } - } }
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java new file mode 100644 index 0000000..df2320f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.co; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.SimpleTimerService; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +@Internal +public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT> + extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> { + + private static final long serialVersionUID = 1L; + + private final TypeSerializer<K> keySerializer; + + private transient TimestampedCollector<OUT> collector; + + private transient TimerService timerService; + + public CoStreamTimelyFlatMap( + TypeSerializer<K> keySerializer, + TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) { + super(flatMapper); + + this.keySerializer = keySerializer; + } + + @Override + public void open() throws Exception { + super.open(); + collector = new TimestampedCollector<>(output); + + InternalTimerService<VoidNamespace> internalTimerService = + getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this); + + this.timerService = new SimpleTimerService(internalTimerService); + } + + @Override + public void processElement1(StreamRecord<IN1> element) throws Exception { + collector.setTimestamp(element); + userFunction.flatMap1(element.getValue(), timerService, collector); + + } + + @Override + public void processElement2(StreamRecord<IN2> element) throws Exception { + collector.setTimestamp(element); + userFunction.flatMap2(element.getValue(), timerService, collector); + } + + @Override + public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector); + } + + @Override + public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector); + } + + protected TimestampedCollector<OUT> getCollector() { + return collector; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index b5500b7..36492d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -30,7 +30,6 @@ import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; @@ -76,6 +75,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I @Override public void open() throws Exception { + super.open(); committer.setOperatorId(id); committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask()); committer.open(); @@ -113,6 +113,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); saveHandleInState(checkpointId, timestamp); @@ -121,6 +122,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I @Override public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader()); } @@ -203,11 +205,6 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I serializer.serialize(value, new DataOutputViewStreamWrapper(out)); } - @Override - public void processWatermark(Watermark mark) throws Exception { - //don't do anything, since we are a sink - } - /** * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were * used since the last completed checkpoint. http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index d331d4d..2a77c0a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -32,7 +32,6 @@ import org.apache.flink.util.MathUtils; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -208,11 +207,6 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, } @Override - public void processWatermark(Watermark mark) { - // this operator does not react to watermarks - } - - @Override public void trigger(long timestamp) throws Exception { // first we check if we actually trigger the window function if (timestamp == nextEvaluationTime) { http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java index 79ef4c6..a252ece 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java @@ -66,7 +66,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed // optimized path for single pane case (tumbling window) for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) { Key key = entry.getKey(); - operator.setKeyContext(key); + operator.setCurrentKey(key); function.apply(entry.getKey(), window, entry.getValue(), out); } } @@ -122,7 +122,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed @Override public void keyDone() throws Exception { - contextOperator.setKeyContext(currentKey); + contextOperator.setCurrentKey(currentKey); function.apply(currentKey, window, unionIterator, out); } } http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java index dfa357e..84686a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java @@ -93,7 +93,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes @Override public void startNewKey(Key key) { currentValue = null; - operator.setKeyContext(key); + operator.setCurrentKey(key); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 141b5b8..2f4dbde 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -29,7 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; @@ -204,110 +204,82 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } @Override - public void processWatermark(Watermark mark) throws Exception { - boolean fire; - do { - Timer<K, W> timer = watermarkTimersQueue.peek(); - if (timer != null && timer.timestamp <= mark.getTimestamp()) { - fire = true; - - watermarkTimers.remove(timer); - watermarkTimersQueue.remove(); - - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); - - ListState<StreamRecord<IN>> windowState; - MergingWindowSet<W> mergingWindows = null; - - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } + public void onEventTime(InternalTimer<K, W> timer) throws Exception { - Iterable<StreamRecord<IN>> contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - continue; - } - - TriggerResult triggerResult = context.onEventTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + context.key = timer.getKey(); + context.window = timer.getNamespace(); - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + ListState<StreamRecord<IN>> windowState; + MergingWindowSet<W> mergingWindows = null; - } else { - fire = false; + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + return; } - } while (fire); + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState( + context.window, + windowSerializer, + windowStateDescriptor); + } + + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + return; + } - output.emitWatermark(mark); + TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - this.currentWatermark = mark.getTimestamp(); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { + cleanup(context.window, windowState, mergingWindows); + } } @Override - public void trigger(long time) throws Exception { - Timer<K, W> timer; - - while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) { - - processingTimeTimers.remove(timer); - processingTimeTimersQueue.remove(); - - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); + public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { + context.key = timer.getKey(); + context.window = timer.getNamespace(); - ListState<StreamRecord<IN>> windowState; - MergingWindowSet<W> mergingWindows = null; + ListState<StreamRecord<IN>> windowState; + MergingWindowSet<W> mergingWindows = null; - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } - - Iterable<StreamRecord<IN>> contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - continue; + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + return; } + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + } - TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + Iterable<StreamRecord<IN>> contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + return; + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents); } - if (timer != null) { - nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { + cleanup(context.window, windowState, mergingWindows); } } http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 459c679..bc37692 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -37,36 +37,28 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.operators.Triggerable; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; -import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import java.io.IOException; import java.io.Serializable; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.concurrent.ScheduledFuture; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -96,7 +88,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> - implements OneInputStreamOperator<IN, OUT>, Triggerable { + implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { private static final long serialVersionUID = 1L; @@ -141,14 +133,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> */ protected transient TimestampedCollector<OUT> timestampedCollector; - protected transient ScheduledFuture<?> nextTimer; - - /** - * To keep track of the current watermark so that we can immediately fire if a trigger - * registers an event time callback for a timestamp that lies in the past. - */ - protected long currentWatermark = Long.MIN_VALUE; - protected transient Context context = new Context(null, null); protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; @@ -157,17 +141,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // State that needs to be checkpointed // ------------------------------------------------------------------------ - /** - * Processing time timers that are currently in-flight. - */ - protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue; - protected transient Set<Timer<K, W>> processingTimeTimers; - - /** - * Current waiting watermark callbacks. - */ - protected transient Set<Timer<K, W>> watermarkTimers; - protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue; + private transient InternalTimerService<W> internalTimerService; protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey; @@ -208,49 +182,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> timestampedCollector = new TimestampedCollector<>(output); - // these could already be initialized from restoreState() - if (watermarkTimers == null) { - watermarkTimers = new HashSet<>(); - watermarkTimersQueue = new PriorityQueue<>(100); - } - if (processingTimeTimers == null) { - processingTimeTimers = new HashSet<>(); - processingTimeTimersQueue = new PriorityQueue<>(100); - } + internalTimerService = + getInternalTimerService("window-timers", keySerializer, windowSerializer, this); context = new Context(null, null); windowAssignerContext = new WindowAssigner.WindowAssignerContext() { @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); + return internalTimerService.currentProcessingTime(); } }; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindowsByKey = new HashMap<>(); } - - // re-register the restored timers (if any) - if (processingTimeTimersQueue.size() > 0) { - nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this); - } } @Override public final void close() throws Exception { super.close(); - - if (nextTimer != null) { - nextTimer.cancel(false); - nextTimer = null; - } - timestampedCollector = null; - watermarkTimers = null; - watermarkTimersQueue = null; - processingTimeTimers = null; - processingTimeTimersQueue = null; context = null; windowAssignerContext = null; mergingWindowsByKey = null; @@ -259,17 +211,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public void dispose() throws Exception { super.dispose(); - - if (nextTimer != null) { - nextTimer.cancel(false); - nextTimer = null; - } - timestampedCollector = null; - watermarkTimers = null; - watermarkTimersQueue = null; - processingTimeTimers = null; - processingTimeTimersQueue = null; context = null; windowAssignerContext = null; mergingWindowsByKey = null; @@ -392,110 +334,81 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } @Override - public void processWatermark(Watermark mark) throws Exception { - boolean fire; - do { - Timer<K, W> timer = watermarkTimersQueue.peek(); - if (timer != null && timer.timestamp <= mark.getTimestamp()) { - fire = true; - - watermarkTimers.remove(timer); - watermarkTimersQueue.remove(); - - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); - - AppendingState<IN, ACC> windowState; - MergingWindowSet<W> mergingWindows = null; - - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } - - ACC contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - continue; - } - - TriggerResult triggerResult = context.onEventTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + public void onEventTime(InternalTimer<K, W> timer) throws Exception { + context.key = timer.getKey(); + context.window = timer.getNamespace(); - if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + AppendingState<IN, ACC> windowState; + MergingWindowSet<W> mergingWindows = null; - } else { - fire = false; + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + return; } - } while (fire); + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState( + context.window, + windowSerializer, + windowStateDescriptor); + } + + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + return; + } - output.emitWatermark(mark); + TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents); + } - this.currentWatermark = mark.getTimestamp(); + if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { + cleanup(context.window, windowState, mergingWindows); + } } @Override - public void trigger(long time) throws Exception { - Timer<K, W> timer; - - while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) { - - processingTimeTimers.remove(timer); - processingTimeTimersQueue.remove(); - - context.key = timer.key; - context.window = timer.window; - setKeyContext(timer.key); - - AppendingState<IN, ACC> windowState; - MergingWindowSet<W> mergingWindows = null; + public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { + context.key = timer.getKey(); + context.window = timer.getNamespace(); - if (windowAssigner instanceof MergingWindowAssigner) { - mergingWindows = getMergingWindowSet(); - W stateWindow = mergingWindows.getStateWindow(context.window); - if (stateWindow == null) { - // then the window is already purged and this is a cleanup - // timer set due to allowed lateness that has nothing to clean, - // so it is safe to just ignore - continue; - } - windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); - } else { - windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); - } + AppendingState<IN, ACC> windowState; + MergingWindowSet<W> mergingWindows = null; - ACC contents = windowState.get(); - if (contents == null) { - // if we have no state, there is nothing to do - continue; + if (windowAssigner instanceof MergingWindowAssigner) { + mergingWindows = getMergingWindowSet(); + W stateWindow = mergingWindows.getStateWindow(context.window); + if (stateWindow == null) { + // then the window is already purged and this is a cleanup + // timer set due to allowed lateness that has nothing to clean, + // so it is safe to just ignore + return; } + windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); + } else { + windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); + } - TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + ACC contents = windowState.get(); + if (contents == null) { + // if we have no state, there is nothing to do + return; + } - if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { - cleanup(context.window, windowState, mergingWindows); - } + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents); } - if (timer != null) { - nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this); + if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) { + cleanup(context.window, windowState, mergingWindows); } } @@ -555,7 +468,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * considered when triggering. */ protected boolean isLate(W window) { - return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark)); + return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark())); } /** @@ -638,7 +551,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } public long getCurrentWatermark() { - return currentWatermark; + return internalTimerService.currentWatermark(); } @Override @@ -697,54 +610,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override public long getCurrentProcessingTime() { - return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime(); + return internalTimerService.currentProcessingTime(); } @Override public void registerProcessingTimeTimer(long time) { - Timer<K, W> timer = new Timer<>(time, key, window); - - // make sure we only put one timer per key into the queue - if (processingTimeTimers.add(timer)) { - - Timer<K, W> oldHead = processingTimeTimersQueue.peek(); - long nextTriggerTime = oldHead != null ? oldHead.timestamp : Long.MAX_VALUE; - - processingTimeTimersQueue.add(timer); - - // check if we need to re-schedule our timer to earlier - if (time < nextTriggerTime) { - if (nextTimer != null) { - nextTimer.cancel(false); - } - nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this); - } - } + internalTimerService.registerProcessingTimeTimer(window, time); } @Override public void registerEventTimeTimer(long time) { - Timer<K, W> timer = new Timer<>(time, key, window); - if (watermarkTimers.add(timer)) { - watermarkTimersQueue.add(timer); - } + internalTimerService.registerEventTimeTimer(window, time); } @Override public void deleteProcessingTimeTimer(long time) { - Timer<K, W> timer = new Timer<>(time, key, window); - - if (processingTimeTimers.remove(timer)) { - processingTimeTimersQueue.remove(timer); - } + internalTimerService.deleteProcessingTimeTimer(window, time); } @Override public void deleteEventTimeTimer(long time) { - Timer<K, W> timer = new Timer<>(time, key, window); - if (watermarkTimers.remove(timer)) { - watermarkTimersQueue.remove(timer); - } + internalTimerService.deleteEventTimeTimer(window, time); } public TriggerResult onElement(StreamRecord<IN> element) throws Exception { @@ -843,67 +729,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} ); ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer); for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) { - setKeyContext(key.getKey()); + setCurrentKey(key.getKey()); ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor); mergeState.clear(); key.getValue().persist(mergeState); } } - snapshotTimers(new DataOutputViewStreamWrapper(out)); - super.snapshotState(out, checkpointId, timestamp); } @Override public void restoreState(FSDataInputStream in) throws Exception { - restoreTimers(new DataInputViewStreamWrapper(in)); - super.restoreState(in); } - private void restoreTimers(DataInputView in ) throws IOException { - int numWatermarkTimers = in.readInt(); - watermarkTimers = new HashSet<>(numWatermarkTimers); - watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); - for (int i = 0; i < numWatermarkTimers; i++) { - K key = keySerializer.deserialize(in); - W window = windowSerializer.deserialize(in); - long timestamp = in.readLong(); - Timer<K, W> timer = new Timer<>(timestamp, key, window); - watermarkTimers.add(timer); - watermarkTimersQueue.add(timer); - } - - int numProcessingTimeTimers = in.readInt(); - processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); - processingTimeTimers = new HashSet<>(); - for (int i = 0; i < numProcessingTimeTimers; i++) { - K key = keySerializer.deserialize(in); - W window = windowSerializer.deserialize(in); - long timestamp = in.readLong(); - Timer<K, W> timer = new Timer<>(timestamp, key, window); - processingTimeTimersQueue.add(timer); - processingTimeTimers.add(timer); - } - } - - private void snapshotTimers(DataOutputView out) throws IOException { - out.writeInt(watermarkTimersQueue.size()); - for (Timer<K, W> timer : watermarkTimersQueue) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } - - out.writeInt(processingTimeTimers.size()); - for (Timer<K,W> timer : processingTimeTimers) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } - } - // ------------------------------------------------------------------------ // Getters for testing // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index d2bf133..d0a2ea9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -20,9 +20,11 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.streaming.runtime.operators.Triggerable; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; @@ -42,7 +44,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { private volatile boolean isQuiesced; // sorts the timers by timestamp so that they are processed in the correct order. - private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>(); + private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = new TreeMap<>(); public void setCurrentTime(long timestamp) throws Exception { @@ -53,10 +55,10 @@ public class TestProcessingTimeService extends ProcessingTimeService { // we do not fire them here to be able to accommodate timers // that register other timers. - Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator(); - List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>(); + Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> it = registeredTasks.entrySet().iterator(); + List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun = new ArrayList<>(); while (it.hasNext()) { - Map.Entry<Long, List<Triggerable>> t = it.next(); + Map.Entry<Long, List<ScheduledTimerFuture>> t = it.next(); if (t.getKey() <= this.currentTime) { toRun.add(t); it.remove(); @@ -64,10 +66,10 @@ public class TestProcessingTimeService extends ProcessingTimeService { } // now do the actual firing. - for (Map.Entry<Long, List<Triggerable>> tasks: toRun) { + for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) { long now = tasks.getKey(); - for (Triggerable task: tasks.getValue()) { - task.trigger(now); + for (ScheduledTimerFuture task: tasks.getValue()) { + task.getTriggerable().trigger(now); } } } @@ -84,7 +86,7 @@ public class TestProcessingTimeService extends ProcessingTimeService { throw new IllegalStateException("terminated"); } if (isQuiesced) { - return new DummyFuture(); + return new ScheduledTimerFuture(null, -1); } if (timestamp <= currentTime) { @@ -94,14 +96,17 @@ public class TestProcessingTimeService extends ProcessingTimeService { throw new RuntimeException(e); } } - List<Triggerable> tasks = registeredTasks.get(timestamp); + + ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp); + + List<ScheduledTimerFuture> tasks = registeredTasks.get(timestamp); if (tasks == null) { tasks = new ArrayList<>(); registeredTasks.put(timestamp, tasks); } - tasks.add(target); + tasks.add(result); - return new DummyFuture(); + return result; } @Override @@ -124,15 +129,34 @@ public class TestProcessingTimeService extends ProcessingTimeService { public int getNumRegisteredTimers() { int count = 0; - for (List<Triggerable> tasks: registeredTasks.values()) { + for (List<ScheduledTimerFuture> tasks: registeredTasks.values()) { count += tasks.size(); } return count; } + public Set<Long> getRegisteredTimerTimestamps() { + Set<Long> actualTimestamps = new HashSet<>(); + for (List<ScheduledTimerFuture> timerFutures : registeredTasks.values()) { + for (ScheduledTimerFuture timer : timerFutures) { + actualTimestamps.add(timer.getTimestamp()); + } + } + return actualTimestamps; + } + // ------------------------------------------------------------------------ - private static class DummyFuture implements ScheduledFuture<Object> { + private class ScheduledTimerFuture implements ScheduledFuture<Object> { + + private final Triggerable triggerable; + + private final long timestamp; + + public ScheduledTimerFuture(Triggerable triggerable, long timestamp) { + this.triggerable = triggerable; + this.timestamp = timestamp; + } @Override public long getDelay(TimeUnit unit) { @@ -146,6 +170,10 @@ public class TestProcessingTimeService extends ProcessingTimeService { @Override public boolean cancel(boolean mayInterruptIfRunning) { + List<ScheduledTimerFuture> scheduledTimerFutures = registeredTasks.get(timestamp); + if (scheduledTimerFutures != null) { + scheduledTimerFutures.remove(this); + } return true; } @@ -168,5 +196,13 @@ public class TestProcessingTimeService extends ProcessingTimeService { public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { throw new UnsupportedOperationException(); } + + public Triggerable getTriggerable() { + return triggerable; + } + + public long getTimestamp() { + return timestamp; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java new file mode 100644 index 0000000..84af997 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link HeapInternalTimerService}. + */ +public class HeapInternalTimerServiceTest { + + private static InternalTimer<Integer, String> anyInternalTimer() { + return any(); + } + + /** + * Verify that we only ever have one processing-time task registered at the + * {@link ProcessingTimeService}. + */ + @Test + public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("ciao", 20); + timerService.registerProcessingTimeTimer("ciao", 30); + timerService.registerProcessingTimeTimer("hello", 10); + timerService.registerProcessingTimeTimer("hello", 20); + + assertEquals(5, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(3, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + + processingTimeService.setCurrentTime(10); + + assertEquals(3, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + processingTimeService.setCurrentTime(20); + + assertEquals(1, timerService.numProcessingTimeTimers()); + assertEquals(0, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L)); + + processingTimeService.setCurrentTime(30); + + assertEquals(0, timerService.numProcessingTimeTimers()); + + assertEquals(0, processingTimeService.getNumRegisteredTimers()); + + timerService.registerProcessingTimeTimer("ciao", 40); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + } + + /** + * Verify that registering a processing-time timer that is earlier than the existing timers + * removes the one physical timer and creates one for the earlier timestamp + * {@link ProcessingTimeService}. + */ + @Test + public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 20); + + assertEquals(1, timerService.numProcessingTimeTimers()); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + timerService.registerProcessingTimeTimer("ciao", 10); + + assertEquals(2, timerService.numProcessingTimeTimers()); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + } + + /** + */ + @Test + public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + + final HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + + assertEquals(1, timerService.numProcessingTimeTimers()); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L)); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + timerService.registerProcessingTimeTimer("ciao", 20); + return null; + } + }).when(mockTriggerable).onProcessingTime(anyInternalTimer()); + + processingTimeService.setCurrentTime(10); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L)); + + doAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Exception { + timerService.registerProcessingTimeTimer("ciao", 30); + return null; + } + }).when(mockTriggerable).onProcessingTime(anyInternalTimer()); + + processingTimeService.setCurrentTime(20); + + assertEquals(1, timerService.numProcessingTimeTimers()); + + assertEquals(1, processingTimeService.getNumRegisteredTimers()); + assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L)); + } + + + @Test + public void testCurrentProcessingTime() throws Exception { + + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + processingTimeService.setCurrentTime(17L); + assertEquals(17, timerService.currentProcessingTime()); + + processingTimeService.setCurrentTime(42); + assertEquals(42, timerService.currentProcessingTime()); + } + + @Test + public void testCurrentEventTime() throws Exception { + + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + timerService.advanceWatermark(17); + assertEquals(17, timerService.currentWatermark()); + + timerService.advanceWatermark(42); + assertEquals(42, timerService.currentWatermark()); + } + + /** + * This also verifies that we don't have leakage between keys/namespaces. + */ + @Test + public void testSetAndFireEventTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + assertEquals(4, timerService.numEventTimeTimers()); + assertEquals(2, timerService.numEventTimeTimers("hello")); + assertEquals(2, timerService.numEventTimeTimers("ciao")); + + timerService.advanceWatermark(10); + + verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer()); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello"))); + + assertEquals(0, timerService.numEventTimeTimers()); + } + + /** + * This also verifies that we don't have leakage between keys/namespaces. + */ + @Test + public void testSetAndFireProcessingTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + assertEquals(4, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + processingTimeService.setCurrentTime(10); + + verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer()); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + + assertEquals(0, timerService.numProcessingTimeTimers()); + } + + /** + * This also verifies that we don't have leakage between keys/namespaces. + * + * <p>This also verifies that deleted timers don't fire. + */ + @Test + public void testDeleteEventTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + assertEquals(4, timerService.numEventTimeTimers()); + assertEquals(2, timerService.numEventTimeTimers("hello")); + assertEquals(2, timerService.numEventTimeTimers("ciao")); + + keyContext.setCurrentKey(0); + timerService.deleteEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + timerService.deleteEventTimeTimer("ciao", 10); + + assertEquals(2, timerService.numEventTimeTimers()); + assertEquals(1, timerService.numEventTimeTimers("hello")); + assertEquals(1, timerService.numEventTimeTimers("ciao")); + + timerService.advanceWatermark(10); + + verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer()); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello"))); + + assertEquals(0, timerService.numEventTimeTimers()); + } + + /** + * This also verifies that we don't have leakage between keys/namespaces. + * + * <p>This also verifies that deleted timers don't fire. + */ + @Test + public void testDeleteProcessingTimeTimers() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + assertEquals(4, timerService.numProcessingTimeTimers()); + assertEquals(2, timerService.numProcessingTimeTimers("hello")); + assertEquals(2, timerService.numProcessingTimeTimers("ciao")); + + keyContext.setCurrentKey(0); + timerService.deleteProcessingTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + timerService.deleteProcessingTimeTimer("ciao", 10); + + assertEquals(2, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + + processingTimeService.setCurrentTime(10); + + verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer()); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + + assertEquals(0, timerService.numEventTimeTimers()); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class); + + TestKeyContext keyContext = new TestKeyContext(); + TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); + HeapInternalTimerService<Integer, String> timerService = + createTimerService(mockTriggerable, keyContext, processingTimeService); + + keyContext.setCurrentKey(0); + + timerService.registerProcessingTimeTimer("ciao", 10); + timerService.registerEventTimeTimer("hello", 10); + + keyContext.setCurrentKey(1); + + timerService.registerEventTimeTimer("ciao", 10); + timerService.registerProcessingTimeTimer("hello", 10); + + assertEquals(2, timerService.numProcessingTimeTimers()); + assertEquals(1, timerService.numProcessingTimeTimers("hello")); + assertEquals(1, timerService.numProcessingTimeTimers("ciao")); + assertEquals(2, timerService.numEventTimeTimers()); + assertEquals(1, timerService.numEventTimeTimers("hello")); + assertEquals(1, timerService.numEventTimeTimers("ciao")); + + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + timerService.snapshotTimers(outStream); + outStream.close(); + + @SuppressWarnings("unchecked") + Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class); + + keyContext = new TestKeyContext(); + processingTimeService = new TestProcessingTimeService(); + + timerService = restoreTimerService( + new ByteArrayInputStream(outStream.toByteArray()), + mockTriggerable2, + keyContext, + processingTimeService); + + processingTimeService.setCurrentTime(10); + timerService.advanceWatermark(10); + + verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer()); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello"))); + verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer()); + verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao"))); + + assertEquals(0, timerService.numEventTimeTimers()); + } + + + private static class TestKeyContext implements KeyContext { + + private Object key; + + @Override + public void setCurrentKey(Object key) { + this.key = key; + } + + @Override + public Object getCurrentKey() { + return key; + } + } + + private static HeapInternalTimerService<Integer, String> createTimerService( + Triggerable<Integer, String> triggerable, + KeyContext keyContext, + ProcessingTimeService processingTimeService) { + return new HeapInternalTimerService<>( + IntSerializer.INSTANCE, + StringSerializer.INSTANCE, + triggerable, + keyContext, + processingTimeService); + } + + private static HeapInternalTimerService<Integer, String> restoreTimerService( + InputStream stateStream, + Triggerable<Integer, String> triggerable, + KeyContext keyContext, + ProcessingTimeService processingTimeService) throws Exception { + HeapInternalTimerService.RestoredTimers<Integer, String> restoredTimers = + new HeapInternalTimerService.RestoredTimers<>( + stateStream, + HeapInternalTimerServiceTest.class.getClassLoader()); + + return new HeapInternalTimerService<>( + IntSerializer.INSTANCE, + StringSerializer.INSTANCE, + triggerable, + keyContext, + processingTimeService, + restoredTimers); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java new file mode 100644 index 0000000..6edf20a --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java @@ -0,0 +1,410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + + +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.streaming.api.TimeDomain; +import org.apache.flink.streaming.api.TimerService; +import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction; +import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.TestHarnessUtil; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link StreamTimelyFlatMap}. + */ +public class TimelyFlatMapTest extends TestLogger { + + @Test + public void testCurrentEventTime() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(17)); + testHarness.processElement(new StreamRecord<>(5, 12L)); + + testHarness.processWatermark(new Watermark(42)); + testHarness.processElement(new StreamRecord<>(6, 13L)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(17L)); + expectedOutput.add(new StreamRecord<>("5TIME:17", 12L)); + expectedOutput.add(new Watermark(42L)); + expectedOutput.add(new StreamRecord<>("6TIME:42", 13L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testCurrentProcessingTime() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(17); + testHarness.processElement(new StreamRecord<>(5)); + + testHarness.setProcessingTime(42); + testHarness.processElement(new StreamRecord<>(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("5TIME:17")); + expectedOutput.add(new StreamRecord<>("6TIME:42")); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testEventTimeTimers() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, Integer> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(0)); + + testHarness.processElement(new StreamRecord<>(17, 42L)); + + testHarness.processWatermark(new Watermark(5)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(0L)); + expectedOutput.add(new StreamRecord<>(17, 42L)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + expectedOutput.add(new Watermark(5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testProcessingTimeTimers() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, Integer> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(17)); + + testHarness.setProcessingTime(5); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>(17)); + expectedOutput.add(new StreamRecord<>(1777, 5L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testEventTimeTimerWithState() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processWatermark(new Watermark(1)); + testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6 + + testHarness.processWatermark(new Watermark(2)); + testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7 + + testHarness.processWatermark(new Watermark(6)); + testHarness.processWatermark(new Watermark(7)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new Watermark(1L)); + expectedOutput.add(new StreamRecord<>("INPUT:17", 0L)); + expectedOutput.add(new Watermark(2L)); + expectedOutput.add(new StreamRecord<>("INPUT:42", 1L)); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new Watermark(6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + expectedOutput.add(new Watermark(7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + /** + * Verifies that we don't have leakage between different keys. + */ + @Test + public void testProcessingTimeTimerWithState() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME)); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(1); + testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6 + + testHarness.setProcessingTime(2); + testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7 + + testHarness.setProcessingTime(6); + testHarness.setProcessingTime(7); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("INPUT:17")); + expectedOutput.add(new StreamRecord<>("INPUT:42")); + expectedOutput.add(new StreamRecord<>("STATE:17", 6L)); + expectedOutput.add(new StreamRecord<>("STATE:42", 7L)); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + @Test + public void testSnapshotAndRestore() throws Exception { + + StreamTimelyFlatMap<Integer, Integer, String> operator = + new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + OneInputStreamOperatorTestHarness<Integer, String> testHarness = + new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>(5, 12L)); + + // snapshot and restore from scratch + StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0); + + testHarness.close(); + + operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction()); + + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO); + + testHarness.setup(); + testHarness.restore(snapshot); + testHarness.open(); + + testHarness.setProcessingTime(5); + testHarness.processWatermark(new Watermark(6)); + + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + expectedOutput.add(new StreamRecord<>("PROC:1777", 5L)); + expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L)); + expectedOutput.add(new Watermark(6)); + + System.out.println("GOT: " + testHarness.getOutput()); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + + testHarness.close(); + } + + private static class IdentityKeySelector<T> implements KeySelector<T, T> { + private static final long serialVersionUID = 1L; + + @Override + public T getKey(T value) throws Exception { + return value; + } + } + + private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + private final TimeDomain timeDomain; + + public QueryingFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + out.collect(value + "TIME:" + timerService.currentWatermark()); + } else { + out.collect(value + "TIME:" + timerService.currentProcessingTime()); + } + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + } + } + + private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> { + + private static final long serialVersionUID = 1L; + + private final TimeDomain timeDomain; + + public TriggeringFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception { + out.collect(value); + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + } else { + timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + } + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<Integer> out) throws Exception { + + assertEquals(this.timeDomain, timeDomain); + out.collect(1777); + } + } + + private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Integer> state = + new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null); + + private final TimeDomain timeDomain; + + public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) { + this.timeDomain = timeDomain; + } + + @Override + public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { + out.collect("INPUT:" + value); + getRuntimeContext().getState(state).update(value); + if (timeDomain.equals(TimeDomain.EVENT_TIME)) { + timerService.registerEventTimeTimer(timerService.currentWatermark() + 5); + } else { + timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5); + } + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + assertEquals(this.timeDomain, timeDomain); + out.collect("STATE:" + getRuntimeContext().getState(state).value()); + } + } + + private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception { + timerService.registerProcessingTimeTimer(5); + timerService.registerEventTimeTimer(6); + + } + + @Override + public void onTimer( + long timestamp, + TimeDomain timeDomain, + TimerService timerService, + Collector<String> out) throws Exception { + if (TimeDomain.EVENT_TIME.equals(timeDomain)) { + out.collect("EVENT:1777"); + } else { + out.collect("PROC:1777"); + } + } + } + +}
