Repository: flink Updated Branches: refs/heads/release-1.2 7fbb115bc -> b99883430
[hotfix] Add EvictingWindowOperatorContractTest The existing WindowOperatorContractTest is turned into a test base while RegularWindowOperatorContract test tests WindowOperator and EvictingWindowOperatorTest tests EvictingWindowOperator. For this to work, the base tests now always use List windows and we have specific tests for reducing/folding windows in RegularWindowOperatorContractTest. This also patches in the missing side output support for EvictingWindowOperator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e27b6971 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e27b6971 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e27b6971 Branch: refs/heads/release-1.2 Commit: e27b69710c30ee44b0da6c6debadd5bef8d72c07 Parents: 7fbb115 Author: Aljoscha Krettek <[email protected]> Authored: Tue Mar 21 15:00:24 2017 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Mar 24 12:24:51 2017 +0800 ---------------------------------------------------------------------- .../windowing/EvictingWindowOperator.java | 12 +- .../EvictingWindowOperatorContractTest.java | 83 +++++ .../RegularWindowOperatorContractTest.java | 269 ++++++++++++++ .../windowing/WindowOperatorContractTest.java | 363 ++++++++----------- 4 files changed, 504 insertions(+), 223 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 17b3984..2d28c00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -85,15 +85,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window @Override @SuppressWarnings("unchecked") public void processElement(StreamRecord<IN> element) throws Exception { - Collection<W> elementWindows = windowAssigner.assignWindows( - element.getValue(), - element.getTimestamp(), - windowAssignerContext); + final Collection<W> elementWindows = windowAssigner.assignWindows( + element.getValue(), element.getTimestamp(), windowAssignerContext); - final K key = (K) getKeyedStateBackend().getCurrentKey(); + final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { - MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window : elementWindows) { @@ -127,7 +124,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window } }); - // check if the window is already inactive + // drop if the window is already late if (isLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; @@ -163,6 +160,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window registerCleanupTimer(actualWindow); } + // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window : elementWindows) { http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java new file mode 100644 index 0000000..50c5950 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; + +/** + * These tests verify that {@link EvictingWindowOperator} correctly interacts with the other + * windowing components: {@link WindowAssigner}, + * {@link Trigger}. + * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. + * + * <p>These tests document the implicit contract that exists between the windowing components. + */ +public class EvictingWindowOperatorContractTest extends WindowOperatorContractTest { + + @Override + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + ListStateDescriptor<StreamRecord<Integer>> intListDescriptor = + new ListStateDescriptor<>( + "int-list", + (TypeSerializer<StreamRecord<Integer>>) new StreamElementSerializer(IntSerializer.INSTANCE)); + + @SuppressWarnings("unchecked") + EvictingWindowOperator<Integer, Integer, OUT, W> operator = new EvictingWindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + intListDescriptor, + windowFunction, + trigger, + CountEvictor.<W>of(100), + allowedLatenss); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java new file mode 100644 index 0000000..13b68c0 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.AppendingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * These tests verify that {@link WindowOperator} correctly interacts with the other windowing + * components: {@link WindowAssigner}, + * {@link Trigger}. + * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. + * + * <p>These tests document the implicit contract that exists between the windowing components. + */ +public class RegularWindowOperatorContractTest extends WindowOperatorContractTest { + + @Test + public void testReducingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + + ReducingStateDescriptor<Integer> intReduceSumDescriptor = + new ReducingStateDescriptor<>( + "int-reduce", + new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer a, Integer b) throws Exception { + return a + b; + } + }, + IntSerializer.INSTANCE); + + final ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE); + + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + @Test + public void testFoldingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor = + new FoldingStateDescriptor<>( + "int-fold", + 0, + new FoldFunction<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; + } + }, + IntSerializer.INSTANCE); + + final ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + /** + * Special method for creating a {@link WindowOperator} with a custom {@link StateDescriptor} + * for the window contents state. + */ + private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, + InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + @SuppressWarnings("unchecked") + WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + stateDescriptor, + windowFunction, + trigger, + allowedLatenss); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + @Override + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + ListStateDescriptor<Integer> intListDescriptor = + new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); + + + @SuppressWarnings("unchecked") + WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + intListDescriptor, + windowFunction, + trigger, + allowedLatenss); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index 47ead66..1d51b45 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -18,20 +18,32 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.AppendingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -42,54 +54,37 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.*; - /** - * These tests verify that {@link WindowOperator} correctly interacts with the other windowing + * Base for window operator tests that verify correct interaction with the other windowing * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}, * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. * * <p>These tests document the implicit contract that exists between the windowing components. */ -public class WindowOperatorContractTest extends TestLogger { +public abstract class WindowOperatorContractTest extends TestLogger { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); private static ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null); - private static ListStateDescriptor<Integer> intListDescriptor = - new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); - - private static ReducingStateDescriptor<Integer> intReduceSumDescriptor = - new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE); - - private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor = - new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE); - static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception { @SuppressWarnings("unchecked") InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class); @@ -306,7 +301,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -331,7 +326,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -355,7 +350,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -402,7 +397,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -418,100 +413,6 @@ public class WindowOperatorContractTest extends TestLogger { } @Test - public void testReducingWindow() throws Exception { - - WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); - Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); - InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); - - KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); - - testHarness.open(); - - when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) - .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); - - assertEquals(0, testHarness.getOutput().size()); - assertEquals(0, testHarness.numKeyedStateEntries()); - - // insert two elements without firing - testHarness.processElement(new StreamRecord<>(1, 0L)); - testHarness.processElement(new StreamRecord<>(1, 0L)); - - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - TimeWindow window = (TimeWindow) invocation.getArguments()[2]; - Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; - context.registerEventTimeTimer(window.getEnd()); - context.getPartitionedState(valueStateDescriptor).update("hello"); - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); - - testHarness.processElement(new StreamRecord<>(1, 0L)); - - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - - // clear is only called at cleanup time/GC time - verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); - - // FIRE should not purge contents - assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state - assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers - } - - @Test - public void testFoldingWindow() throws Exception { - - WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); - Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); - InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); - - KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction); - - testHarness.open(); - - when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) - .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); - - assertEquals(0, testHarness.getOutput().size()); - assertEquals(0, testHarness.numKeyedStateEntries()); - - // insert two elements without firing - testHarness.processElement(new StreamRecord<>(1, 0L)); - testHarness.processElement(new StreamRecord<>(1, 0L)); - - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - TimeWindow window = (TimeWindow) invocation.getArguments()[2]; - Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; - context.registerEventTimeTimer(window.getEnd()); - context.getPartitionedState(valueStateDescriptor).update("hello"); - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); - - testHarness.processElement(new StreamRecord<>(1, 0L)); - - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - - // clear is only called at cleanup time/GC time - verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); - - // FIRE should not purge contents - assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state - assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers - } - - @Test public void testEmittingFromWindowFunction() throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); @@ -519,7 +420,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -570,7 +471,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -613,7 +514,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -655,7 +556,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -698,7 +599,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -743,7 +644,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -796,7 +697,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -854,7 +755,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -916,7 +817,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -977,7 +878,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1045,7 +946,7 @@ public class WindowOperatorContractTest extends TestLogger { mock(InternalWindowFunction.class); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1109,7 +1010,7 @@ public class WindowOperatorContractTest extends TestLogger { mock(InternalWindowFunction.class); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1148,6 +1049,83 @@ public class WindowOperatorContractTest extends TestLogger { } @Test + public void testNoEventTimeFiringForGarbageCollectedMergingWindow() throws Exception { + testNoTimerFiringForGarbageCollectedMergingWindow(new EventTimeAdaptor()); + } + + @Test + public void testNoProcessingTimeFiringForGarbageCollectedMergingWindow() throws Exception { + testNoTimerFiringForGarbageCollectedMergingWindow(new ProcessingTimeAdaptor()); + } + + + /** + * Verify that we neither invoke the trigger nor the window function if a timer + * fires for a merging window that was already garbage collected. + */ + public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + + MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); + timeAdaptor.setIsEventTime(mockAssigner); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + + @SuppressWarnings("unchecked") + InternalWindowFunction<Iterable<Integer>, List<Integer>, Integer, TimeWindow> mockWindowFunction = + mock(InternalWindowFunction.class); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); + + testHarness.open(); + + timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4))); + + assertEquals(0, testHarness.extractOutputStreamRecords().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + // set a timer for after the GC time + timeAdaptor.registerTimer(context, 10L); + return TriggerResult.CONTINUE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + + testHarness.processElement(new StreamRecord<>(0, 0L)); + + assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set + assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer and gc timer + + timeAdaptor.shouldContinueOnTime(mockTrigger); + + // this should trigger GC + timeAdaptor.advanceTime(testHarness, 4L); + + verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext()); + + assertEquals(0, testHarness.numKeyedStateEntries()); + // we still have a dangling timer because our trigger doesn't do cleanup + assertEquals(1, timeAdaptor.numTimers(testHarness)); + + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + + verify(mockWindowFunction, never()) + .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector()); + + // now we trigger the dangling timer + timeAdaptor.advanceTime(testHarness, 10L); + + // we don't fire again + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null); + } + + @Test public void testEventTimeTimerCreationAndDeletion() throws Exception { testTimerCreationAndDeletion(new EventTimeAdaptor()); } @@ -1166,7 +1144,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1213,7 +1191,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1271,7 +1249,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1312,7 +1290,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1354,7 +1332,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1451,7 +1429,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1534,7 +1512,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1578,7 +1556,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1626,7 +1604,7 @@ public class WindowOperatorContractTest extends TestLogger { assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp()); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1651,7 +1629,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1680,7 +1658,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1726,7 +1704,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1771,7 +1749,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1827,7 +1805,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1897,7 +1875,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1968,7 +1946,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2034,7 +2012,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2062,7 +2040,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2101,7 +2079,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2140,7 +2118,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -2185,11 +2163,11 @@ public class WindowOperatorContractTest extends TestLogger { }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); // only fire on the timestamp==0L timers, not the gc timers - when(mockTrigger.onEventTime(eq(0L), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + when(mockTrigger.onEventTime(eq(0L), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); mockWindowFunction = mockWindowFunction(); - testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.setup(); testHarness.initializeState(snapshot); @@ -2220,59 +2198,12 @@ public class WindowOperatorContractTest extends TestLogger { assertEquals(0, testHarness.numEventTimeTimers()); } - private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, long allowedLatenss, - StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, - InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception { - - KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = 1L; + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception; - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }; - - @SuppressWarnings("unchecked") - WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>( - assigner, - assigner.getWindowSerializer(new ExecutionConfig()), - keySelector, - IntSerializer.INSTANCE, - stateDescriptor, - windowFunction, - trigger, - allowedLatenss); - - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - } - - - private static class Sum implements ReduceFunction<Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce(Integer value1, Integer value2) throws Exception { - return value1 + value2; - } - } - - private static class FoldSum implements FoldFunction<Integer, Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer fold( - Integer accumulator, - Integer value) throws Exception { - return accumulator + value; - } - } private interface TimeDomainAdaptor {
