[FLINK-4552] Refactor WindowOperator/Trigger Tests Before, tests for WindowOperator, WindowAssigner, Trigger and WindowFunction were all conflated in WindowOperatorTest. All of these test that a certain combination of a Trigger, WindowAssigner and WindowFunction produce the expected output.
This change modularizes these tests and spreads them out across multiple files. For example, one per trigger/window assigner. The new WindowOperatorContractTest verifies that the interaction between WindowOperator and the various other parts works as expected, that the correct methods on Trigger and WindowFunction are called at the expected time and that snapshotting, timers, cleanup etc. work correctly. These tests also verify that the different state types and WindowFunctions work correctly. For trigger tests this introduces TriggerTestHarness. This can be used to inject elements into Triggers they fire at the correct times. The actual output of the WindowFunction is not important for these tests. The new tests also make sure that triggers correctly clean up state and timers. WindowAssigner tests verify the behaviour of window assigners in isolation. They also test, for example, whether offset parameter of time-based windows work correctly. We keep the old WindowOperatorTest because it still provides some level of coverage and doesn't take long to run. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1475ee8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1475ee8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1475ee8 Branch: refs/heads/master Commit: d1475ee86fb58ab70a6dc785d08f190189ede43d Parents: 0b331a4 Author: Aljoscha Krettek <[email protected]> Authored: Mon Sep 5 12:01:11 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue Jan 24 10:42:34 2017 +0100 ---------------------------------------------------------------------- .../state/heap/HeapKeyedStateBackend.java | 45 + .../runtime/state/MemoryStateBackendTest.java | 43 + .../assigners/EventTimeSessionWindows.java | 4 + .../api/windowing/assigners/GlobalWindows.java | 4 +- .../assigners/ProcessingTimeSessionWindows.java | 4 + .../assigners/SlidingEventTimeWindows.java | 28 +- .../assigners/SlidingProcessingTimeWindows.java | 30 +- .../assigners/TumblingEventTimeWindows.java | 36 +- .../TumblingProcessingTimeWindows.java | 32 +- .../tasks/TestProcessingTimeService.java | 2 +- .../api/operators/TestInternalTimerService.java | 238 ++ .../TestProcessingTimeServiceTest.java | 2 +- .../operators/windowing/CountTriggerTest.java | 166 ++ .../windowing/EventTimeSessionWindowsTest.java | 179 ++ .../windowing/EventTimeTriggerTest.java | 153 ++ .../operators/windowing/GlobalWindowsTest.java | 59 + .../windowing/MergingWindowSetTest.java | 25 + .../ProcessingTimeSessionWindowsTest.java | 184 ++ .../windowing/ProcessingTimeTriggerTest.java | 134 + .../operators/windowing/PurgingTriggerTest.java | 149 + .../windowing/SimpleTriggerTestHarness.java | 41 + .../windowing/SlidingEventTimeWindowsTest.java | 168 ++ .../SlidingProcessingTimeWindowsTest.java | 177 ++ .../windowing/StreamRecordMatchers.java | 179 ++ .../operators/windowing/TriggerTestHarness.java | 381 +++ .../windowing/TumblingEventTimeWindowsTest.java | 113 + .../TumblingProcessingTimeWindowsTest.java | 129 + .../windowing/WindowOperatorContractTest.java | 2572 ++++++++++++++++++ .../operators/windowing/WindowOperatorTest.java | 171 -- .../operators/windowing/WindowedValue.java | 47 + .../windowing/WindowingTestHarnessTest.java | 230 -- .../util/AbstractStreamOperatorTestHarness.java | 20 + .../KeyedOneInputStreamOperatorTestHarness.java | 17 + .../flink/streaming/util/TestHarnessUtil.java | 15 +- .../streaming/util/WindowingTestHarness.java | 221 -- 35 files changed, 5298 insertions(+), 700 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index b4c2b8b..0fe92e7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.heap; import org.apache.commons.io.IOUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.state.AggregatingStateDescriptor; import org.apache.flink.api.common.state.FoldingStateDescriptor; @@ -534,4 +535,48 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { return stateSerializer; } } + + /** + * Returns the total number of state entries across all keys/namespaces. + */ + @VisibleForTesting + @SuppressWarnings("unchecked") + public int numStateEntries() { + int sum = 0; + for (StateTable<K, ?, ?> stateTable : stateTables.values()) { + for (Map namespaceMap : stateTable.getState()) { + if (namespaceMap == null) { + continue; + } + Map<?, Map> typedMap = (Map<?, Map>) namespaceMap; + for (Map entriesMap : typedMap.values()) { + sum += entriesMap.size(); + } + } + } + return sum; + } + + /** + * Returns the total number of state entries across all keys for the given namespace. + */ + @VisibleForTesting + @SuppressWarnings("unchecked") + public <N> int numStateEntries(N namespace) { + int sum = 0; + for (StateTable<K, ?, ?> stateTable : stateTables.values()) { + for (Map namespaceMap : stateTable.getState()) { + if (namespaceMap == null) { + continue; + } + Map<?, Map> typedMap = (Map<?, Map>) namespaceMap; + Map singleNamespace = typedMap.get(namespace); + if (singleNamespace != null) { + sum += singleNamespace.size(); + } + } + } + return sum; + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index ac6adff..c267afc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -18,7 +18,12 @@ package org.apache.flink.runtime.state; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.junit.Test; @@ -56,6 +61,44 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack public void testReducingStateRestoreWithWrongSerializers() {} @Test + @SuppressWarnings("unchecked") + public void testNumStateEntries() throws Exception { + KeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class, null); + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + HeapKeyedStateBackend<Integer> heapBackend = (HeapKeyedStateBackend<Integer>) backend; + + assertEquals(0, heapBackend.numStateEntries()); + + ValueState<String> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(0); + state.update("hello"); + state.update("ciao"); + + assertEquals(1, heapBackend.numStateEntries()); + + backend.setCurrentKey(42); + state.update("foo"); + + assertEquals(2, heapBackend.numStateEntries()); + + backend.setCurrentKey(0); + state.clear(); + + assertEquals(1, heapBackend.numStateEntries()); + + backend.setCurrentKey(42); + state.clear(); + + assertEquals(0, heapBackend.numStateEntries()); + + backend.dispose(); + } + + @Test public void testOversizedState() { try { MemoryStateBackend backend = new MemoryStateBackend(10); http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java index e38f617..1703f6c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java @@ -47,6 +47,10 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW protected long sessionTimeout; protected EventTimeSessionWindows(long sessionTimeout) { + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size"); + } + this.sessionTimeout = sessionTimeout; } http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java index 7ea3158..9e3846d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.api.windowing.assigners; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -70,7 +71,8 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { /** * A trigger that never fires, as default Trigger for GlobalWindows. */ - private static class NeverTrigger extends Trigger<Object, GlobalWindow> { + @Internal + public static class NeverTrigger extends Trigger<Object, GlobalWindow> { private static final long serialVersionUID = 1L; @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java index 52d1c03..02c680e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java @@ -47,6 +47,10 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, protected long sessionTimeout; protected ProcessingTimeSessionWindows(long sessionTimeout) { + if (sessionTimeout <= 0) { + throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size"); + } + this.sessionTimeout = sessionTimeout; } http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index 16171a0..ef6ed56 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -55,6 +55,10 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> private final long offset; protected SlidingEventTimeWindows(long size, long slide, long offset) { + if (offset < 0 || offset >= slide || size <= 0) { + throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); + } + this.size = size; this.slide = slide; this.offset = offset; @@ -109,20 +113,18 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> } /** - * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the element timestamp and offset. - *<p> - * For example, if you want window a stream by hour,but window begins at the 15th minutes - * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get - * time windows start at 0:15:00,1:15:00,2:15:00,etc. - *</p> + * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * - * <p> - * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, - * such as China which is using UTC+08:00,and you want a time window with size of one day, - * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. - * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. - * </p> * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @param offset The offset which window start would be shifted by. http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java index e03467f..c11045d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java @@ -51,7 +51,11 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin private final long slide; - private SlidingProcessingTimeWindows(long size, long slide, long offset){ + private SlidingProcessingTimeWindows(long size, long slide, long offset) { + if (offset < 0 || offset >= slide || size <= 0) { + throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0"); + } + this.size = size; this.slide = slide; this.offset = offset; @@ -101,20 +105,18 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin } /** - * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the element timestamp and offset. - *<p> - * For example, if you want window a stream by hour,but window begins at the 15th minutes - * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get - * time windows start at 0:15:00,1:15:00,2:15:00,etc. - *</p> + * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * - * <p> - * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, - * such as China which is using UTC+08:00,and you want a time window with size of one day, - * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. - * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. - * </p> * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @param offset The offset which window start would be shifted by. http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index b7fa343..d695a0c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -51,7 +51,11 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> private final long offset; - protected TumblingEventTimeWindows(long size, long offset){ + protected TumblingEventTimeWindows(long size, long offset) { + if (offset < 0 || offset >= size) { + throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size"); + } + this.size = size; this.offset = offset; } @@ -68,10 +72,6 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> } } - public long getSize() { - return size; - } - @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); @@ -94,26 +94,24 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> } /** - * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the element timestamp and offset. - *<p> - * For example, if you want window a stream by hour,but window begins at the 15th minutes - * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get - * time windows start at 0:15:00,1:15:00,2:15:00,etc. - *</p> + * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * - * <p> - * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, - * such as China which is using UTC+08:00,and you want a time window with size of one day, - * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. - * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. - * </p> * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. */ public static TumblingEventTimeWindows of(Time size, Time offset) { - return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); + return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java index f4fb620..5b39fe0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java @@ -49,7 +49,11 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi private final long offset; - private TumblingProcessingTimeWindows(long size,long offset) { + private TumblingProcessingTimeWindows(long size, long offset) { + if (offset < 0 || offset >= size) { + throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy 0 <= offset < size"); + } + this.size = size; this.offset = offset; } @@ -87,26 +91,24 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi } /** - * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns - * elements to time windows based on the element timestamp and offset. - *<p> - * For example, if you want window a stream by hour,but window begins at the 15th minutes - * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get - * time windows start at 0:15:00,1:15:00,2:15:00,etc. - *</p> + * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns + * elements to time windows based on the element timestamp and offset. + * + * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes + * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get + * time windows start at 0:15:00,1:15:00,2:15:00,etc. + * + * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, + * such as China which is using UTC+08:00,and you want a time window with size of one day, + * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. + * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * - * <p> - * Rather than that,if you are living in somewhere which is not using UTC±00:00 time, - * such as China which is using UTC+08:00,and you want a time window with size of one day, - * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. - * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. - * </p> * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. */ public static TumblingProcessingTimeWindows of(Time size, Time offset) { - return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds()); + return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java index b4e7e97..e5fcc1a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; * */ public class TestProcessingTimeService extends ProcessingTimeService { - private volatile long currentTime = 0L; + private volatile long currentTime = Long.MIN_VALUE; private volatile boolean isTerminated; private volatile boolean isQuiesced; http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java new file mode 100644 index 0000000..a03a4c5 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; + +/** + * Implementation of {@link InternalTimerService} meant to use for testing. + */ +@Internal +public class TestInternalTimerService<K, N> implements InternalTimerService<N> { + + private long currentProcessingTime = Long.MIN_VALUE; + + private long currentWatermark = Long.MIN_VALUE; + + private final KeyContext keyContext; + + /** + * Processing time timers that are currently in-flight. + */ + private final PriorityQueue<Timer<K, N>> processingTimeTimersQueue; + private final Set<Timer<K, N>> processingTimeTimers; + + /** + * Current waiting watermark callbacks. + */ + private final Set<Timer<K, N>> watermarkTimers; + private final PriorityQueue<Timer<K, N>> watermarkTimersQueue; + + public TestInternalTimerService(KeyContext keyContext) { + this.keyContext = keyContext; + + watermarkTimers = new HashSet<>(); + watermarkTimersQueue = new PriorityQueue<>(100); + processingTimeTimers = new HashSet<>(); + processingTimeTimersQueue = new PriorityQueue<>(100); + } + + @Override + public long currentProcessingTime() { + return currentProcessingTime; + } + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public void registerProcessingTimeTimer(N namespace, long time) { + @SuppressWarnings("unchecked") + Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace); + // make sure we only put one timer per key into the queue + if (processingTimeTimers.add(timer)) { + processingTimeTimersQueue.add(timer); + } + } + + @Override + public void registerEventTimeTimer(N namespace, long time) { + @SuppressWarnings("unchecked") + Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace); + if (watermarkTimers.add(timer)) { + watermarkTimersQueue.add(timer); + } + } + + @Override + public void deleteProcessingTimeTimer(N namespace, long time) { + @SuppressWarnings("unchecked") + Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace); + + if (processingTimeTimers.remove(timer)) { + processingTimeTimersQueue.remove(timer); + } + } + + @Override + public void deleteEventTimeTimer(N namespace, long time) { + @SuppressWarnings("unchecked") + Timer<K, N> timer = new Timer<>(time, (K) keyContext.getCurrentKey(), namespace); + if (watermarkTimers.remove(timer)) { + watermarkTimersQueue.remove(timer); + } + } + + public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception { + List<Timer<K, N>> result = new ArrayList<>(); + + Timer<K, N> timer = processingTimeTimersQueue.peek(); + + while (timer != null && timer.timestamp <= time) { + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(); + result.add(timer); + timer = processingTimeTimersQueue.peek(); + } + + currentProcessingTime = time; + return result; + } + + public Collection<Timer<K, N>> advanceWatermark(long time) throws Exception { + List<Timer<K, N>> result = new ArrayList<>(); + + Timer<K, N> timer = watermarkTimersQueue.peek(); + + while (timer != null && timer.timestamp <= time) { + watermarkTimers.remove(timer); + watermarkTimersQueue.remove(); + result.add(timer); + timer = watermarkTimersQueue.peek(); + } + + currentWatermark = time; + return result; + } + + /** + * Internal class for keeping track of in-flight timers. + */ + public static class Timer<K, N> implements Comparable<Timer<K, N>> { + private final long timestamp; + private final K key; + private final N namespace; + + public Timer(long timestamp, K key, N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + } + + public long getTimestamp() { + return timestamp; + } + + public K getKey() { + return key; + } + + public N getNamespace() { + return namespace; + } + + @Override + public int compareTo(Timer<K, N> o) { + return Long.compare(this.timestamp, o.timestamp); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()){ + return false; + } + + Timer<?, ?> timer = (Timer<?, ?>) o; + + return timestamp == timer.timestamp + && key.equals(timer.key) + && namespace.equals(timer.namespace); + + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result + namespace.hashCode(); + return result; + } + + @Override + public String toString() { + return "Timer{" + + "timestamp=" + timestamp + + ", key=" + key + + ", namespace=" + namespace + + '}'; + } + } + + public int numProcessingTimeTimers() { + return processingTimeTimers.size(); + } + + public int numEventTimeTimers() { + return watermarkTimers.size(); + } + + public int numProcessingTimeTimers(N namespace) { + int count = 0; + for (Timer<K, N> timer : processingTimeTimers) { + if (timer.getNamespace().equals(namespace)) { + count++; + } + } + + return count; + } + + public int numEventTimeTimers(N namespace) { + int count = 0; + for (Timer<K, N> timer : watermarkTimers) { + if (timer.getNamespace().equals(namespace)) { + count++; + } + } + + return count; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java index 2903758..4d24b82 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java @@ -52,7 +52,7 @@ public class TestProcessingTimeServiceTest { testHarness.invoke(); - assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 0); + assertEquals(Long.MIN_VALUE, testHarness.getProcessingTimeService().getCurrentProcessingTime()); tp.setCurrentTime(11); assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 11); http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java new file mode 100644 index 0000000..16e353b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CountTriggerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link CountTrigger}. + */ +public class CountTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // right now, CountTrigger will clear it's state in onElement when firing + // ideally, this should be moved to onFire() + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // now all state should be gone + assertEquals(0, testHarness.numStateEntries()); + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(3, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 4))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6))); + + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 4))); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(0, 4))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6))); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6))); + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6))); + + assertEquals(0, testHarness.numStateEntries()); + } + + @Test + public void testMergeSubsumingWindow() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(CountTrigger.<TimeWindow>of(3), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(4, 6))); + + // shouldn't have any timers + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + + assertEquals(2, testHarness.numStateEntries()); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(4, 6))); + + testHarness.mergeWindows(new TimeWindow(0, 8), Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(4, 6))); + + assertEquals(1, testHarness.numStateEntries()); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(2, 4))); + assertEquals(0, testHarness.numStateEntries(new TimeWindow(4, 6))); + assertEquals(1, testHarness.numStateEntries(new TimeWindow(0, 8))); + + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 8))); + + assertEquals(0, testHarness.numStateEntries()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java new file mode 100644 index 0000000..a46572b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; + +import java.util.Collection; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link EventTimeSessionWindows} + */ +public class EventTimeSessionWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + final int SESSION_GAP = 5000; + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(SESSION_GAP)); + + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP))); + } + + @Test + public void testMergeSinglePointWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback); + + verify(callback, never()).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeSingleWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback); + + verify(callback, never()).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeConsecutiveWindows() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows( + Lists.newArrayList( + new TimeWindow(0, 1), + new TimeWindow(1, 2), + new TimeWindow(2, 3), + new TimeWindow(4, 5), + new TimeWindow(5, 6)), + callback); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))), + eq(new TimeWindow(0, 3))); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))), + eq(new TimeWindow(4, 6))); + + verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeCoveringWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows( + Lists.newArrayList( + new TimeWindow(1, 1), + new TimeWindow(0, 2), + new TimeWindow(4, 7), + new TimeWindow(5, 6)), + callback); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))), + eq(new TimeWindow(0, 2))); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))), + eq(new TimeWindow(4, 7))); + + verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + final int SESSION_GAP = 5000; + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(SESSION_GAP / 1000)); + + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(timeWindow(0, 0 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(timeWindow(4999, 4999 + SESSION_GAP))); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(timeWindow(5000, 5000 + SESSION_GAP))); + } + + @Test + public void testInvalidParameters() { + try { + EventTimeSessionWindows.withGap(Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 < size")); + } + + try { + EventTimeSessionWindows.withGap(Time.seconds(0)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 < size")); + } + + } + + @Test + public void testProperties() { + EventTimeSessionWindows assigner = EventTimeSessionWindows.withGap(Time.seconds(5)); + + assertTrue(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java new file mode 100644 index 0000000..2d93ac0 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeTriggerTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link EventTimeTrigger}. + */ +public class EventTimeTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + // inject several elements + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(2, new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } + + /** + * Verify that late elements trigger immediately and also that we don't set a timer + * for those. + */ + @Test + public void testLateElementTriggersImmediately() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + testHarness.advanceWatermark(2); + + assertEquals(TriggerResult.FIRE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } + + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(EventTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(EventTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(2, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numEventTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numEventTimeTimers(new TimeWindow(0, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceWatermark(4, new TimeWindow(0, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java new file mode 100644 index 0000000..37fad7e --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/GlobalWindowsTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link GlobalWindows} + */ +public class GlobalWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + GlobalWindows assigner = GlobalWindows.create(); + + assertThat(assigner.assignWindows("String", 0L, mockContext), contains(GlobalWindow.get())); + assertThat(assigner.assignWindows("String", 4999L, mockContext), contains(GlobalWindow.get())); + assertThat(assigner.assignWindows("String", 5000L, mockContext), contains(GlobalWindow.get())); + } + + @Test + public void testProperties() { + GlobalWindows assigner = GlobalWindows.create(); + + assertFalse(assigner.isEventTime()); + assertEquals(new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(GlobalWindows.NeverTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java index 46169a8..aa9cb91 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSetTest.java @@ -290,6 +290,31 @@ public class MergingWindowSetTest { } /** + * Test adding a new window that is identical to an existing window. This should not cause + * a merge. + */ + @Test + public void testAddingIdenticalWindows() throws Exception { + @SuppressWarnings("unchecked") + ListState<Tuple2<TimeWindow, TimeWindow>> mockState = mock(ListState.class); + + MergingWindowSet<TimeWindow> windowSet = new MergingWindowSet<>(EventTimeSessionWindows.withGap(Time.milliseconds(3)), mockState); + + TestingMergeFunction mergeFunction = new TestingMergeFunction(); + + mergeFunction.reset(); + assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2))); + + mergeFunction.reset(); + assertEquals(new TimeWindow(1, 2), windowSet.addWindow(new TimeWindow(1, 2), mergeFunction)); + assertFalse(mergeFunction.hasMerged()); + assertEquals(new TimeWindow(1, 2), windowSet.getStateWindow(new TimeWindow(1, 2))); + } + + + /** * Test merging of a large new window that covers multiple existing windows. */ @Test http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java new file mode 100644 index 0000000..461b5fc --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + + +import com.google.common.collect.Lists; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows; +import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.util.TestLogger; +import org.junit.Test; +import org.mockito.Matchers; + +import java.util.Collection; + +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link ProcessingTimeSessionWindows} + */ +public class ProcessingTimeSessionWindowsTest extends TestLogger { + + @Test + public void testWindowAssignment() { + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(0L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(4999L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4999, 9999))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000))); + } + + @Test + public void testMergeSinglePointWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback); + + verify(callback, never()).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeSingleWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback); + + verify(callback, never()).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeConsecutiveWindows() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows( + Lists.newArrayList( + new TimeWindow(0, 1), + new TimeWindow(1, 2), + new TimeWindow(2, 3), + new TimeWindow(4, 5), + new TimeWindow(5, 6)), + callback); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))), + eq(new TimeWindow(0, 3))); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))), + eq(new TimeWindow(4, 6))); + + verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testMergeCoveringWindow() { + MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.milliseconds(5000)); + + assigner.mergeWindows( + Lists.newArrayList( + new TimeWindow(1, 1), + new TimeWindow(0, 2), + new TimeWindow(4, 7), + new TimeWindow(5, 6)), + callback); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))), + eq(new TimeWindow(0, 2))); + + verify(callback, times(1)).merge( + (Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))), + eq(new TimeWindow(4, 7))); + + verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject()); + } + + @Test + public void testTimeUnits() { + // sanity check with one other time unit + + WindowAssigner.WindowAssignerContext mockContext = + mock(WindowAssigner.WindowAssignerContext.class); + + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.seconds(5)); + + when(mockContext.getCurrentProcessingTime()).thenReturn(0L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(4999L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(4999, 9999))); + + when(mockContext.getCurrentProcessingTime()).thenReturn(5000L); + assertThat(assigner.assignWindows("String", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 10000))); + } + + @Test + public void testInvalidParameters() { + try { + ProcessingTimeSessionWindows.withGap(Time.seconds(-1)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 < size")); + } + + try { + ProcessingTimeSessionWindows.withGap(Time.seconds(0)); + fail("should fail"); + } catch (IllegalArgumentException e) { + assertThat(e.toString(), containsString("0 < size")); + } + + } + + @Test + public void testProperties() { + ProcessingTimeSessionWindows assigner = ProcessingTimeSessionWindows.withGap(Time.seconds(5)); + + assertFalse(assigner.isEventTime()); + assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig())); + assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java new file mode 100644 index 0000000..a0c2347 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeTriggerTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import com.google.common.collect.Lists; +import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link ProcessingTimeTrigger}. + */ +public class ProcessingTimeTriggerTest { + + /** + * Verify that state of separate windows does not leak into other windows. + */ + @Test + public void testWindowSeparationAndFiring() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer()); + + // inject several elements + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(2, new TimeWindow(0, 2))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(4, new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } + + /** + * Verify that clear() does not leak across windows. + */ + @Test + public void testClear() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(2, 4)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + } + + @Test + public void testMergingWindows() throws Exception { + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(ProcessingTimeTrigger.create(), new TimeWindow.Serializer()); + + assertTrue(ProcessingTimeTrigger.create().canMerge()); + + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(2, testHarness.numProcessingTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + + testHarness.mergeWindows(new TimeWindow(0, 4), Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numEventTimeTimers()); + assertEquals(1, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numProcessingTimeTimers(new TimeWindow(2, 4))); + assertEquals(1, testHarness.numProcessingTimeTimers(new TimeWindow(0, 4))); + + assertEquals(TriggerResult.FIRE, testHarness.advanceProcessingTime(4, new TimeWindow(0, 4))); + + assertEquals(0, testHarness.numStateEntries()); + assertEquals(0, testHarness.numProcessingTimeTimers()); + assertEquals(0, testHarness.numEventTimeTimers()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java new file mode 100644 index 0000000..4302d4d --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/PurgingTriggerTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.lang.reflect.Method; +import java.util.Collections; + +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyOnMergeContext; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTimeWindow; +import static org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorContractTest.anyTriggerContext; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.*; + +/** + * Tests for {@link PurgingTrigger}. + */ +public class PurgingTriggerTest { + + /** + * Check if {@link PurgingTrigger} implements all methods of {@link Trigger}, as a sanity + * check. + */ + @Test + public void testAllMethodsImplemented() throws NoSuchMethodException { + for (Method triggerMethod : Trigger.class.getDeclaredMethods()) { + + // try retrieving the method, this will throw an exception if we can't find it + PurgingTrigger.class.getDeclaredMethod(triggerMethod.getName(),triggerMethod.getParameterTypes()); + } + } + + @Test + public void testForwarding() throws Exception { + Trigger<Object, TimeWindow> mockTrigger = mock(Trigger.class); + + TriggerTestHarness<Object, TimeWindow> testHarness = + new TriggerTestHarness<>(PurgingTrigger.of(mockTrigger), new TimeWindow.Serializer()); + + when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + assertEquals(TriggerResult.CONTINUE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + when(mockTrigger.onElement(Matchers.anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + assertEquals(TriggerResult.PURGE, testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2))); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + + // register some timers that we can step through to call onEventTime several + // times in a row + context.registerEventTimeTimer(1); + context.registerEventTimeTimer(2); + context.registerEventTimeTimer(3); + context.registerEventTimeTimer(4); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + // set up our timers + testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)); + + assertEquals(4, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + assertEquals(TriggerResult.CONTINUE, testHarness.advanceWatermark(1, new TimeWindow(0, 2))); + + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceWatermark(2, new TimeWindow(0, 2))); + + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceWatermark(3, new TimeWindow(0, 2))); + + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + assertEquals(TriggerResult.PURGE, testHarness.advanceWatermark(4, new TimeWindow(0, 2))); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + + // register some timers that we can step through to call onEventTime several + // times in a row + context.registerProcessingTimeTimer(1); + context.registerProcessingTimeTimer(2); + context.registerProcessingTimeTimer(3); + context.registerProcessingTimeTimer(4); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + // set up our timers + testHarness.processElement(new StreamRecord<Object>(1), new TimeWindow(0, 2)); + + assertEquals(4, testHarness.numProcessingTimeTimers(new TimeWindow(0, 2))); + assertEquals(0, testHarness.numEventTimeTimers(new TimeWindow(0, 2))); + + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + assertEquals(TriggerResult.CONTINUE, testHarness.advanceProcessingTime(1, new TimeWindow(0, 2))); + + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceProcessingTime(2, new TimeWindow(0, 2))); + + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + assertEquals(TriggerResult.FIRE_AND_PURGE, testHarness.advanceProcessingTime(3, new TimeWindow(0, 2))); + + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + assertEquals(TriggerResult.PURGE, testHarness.advanceProcessingTime(4, new TimeWindow(0, 2))); + + testHarness.mergeWindows(new TimeWindow(0, 2), Collections.singletonList(new TimeWindow(0, 1))); + verify(mockTrigger, times(1)).onMerge(anyTimeWindow(), anyOnMergeContext()); + + testHarness.clearTriggerState(new TimeWindow(0, 2)); + verify(mockTrigger, times(1)).clear(eq(new TimeWindow(0, 2)), anyTriggerContext()); + } + +}
