[hotfix] Add EvictingWindowOperatorContractTest The existing WindowOperatorContractTest is turned into a test base while RegularWindowOperatorContract test tests WindowOperator and EvictingWindowOperatorTest tests EvictingWindowOperator. For this to work, the base tests now always use List windows and we have specific tests for reducing/folding windows in RegularWindowOperatorContractTest.
This also patches in the missing side output support for EvictingWindowOperator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c4b1565 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c4b1565 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c4b1565 Branch: refs/heads/table-retraction Commit: 3c4b156527e9ca7cb2dafdda706913e91d688133 Parents: 8319a45 Author: Aljoscha Krettek <[email protected]> Authored: Tue Mar 21 15:00:24 2017 +0100 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Thu Mar 23 23:29:01 2017 +0800 ---------------------------------------------------------------------- .../windowing/EvictingWindowOperator.java | 26 +- .../operators/windowing/WindowOperator.java | 4 +- .../EvictingWindowOperatorContractTest.java | 99 ++++++ .../RegularWindowOperatorContractTest.java | 288 +++++++++++++++++ .../windowing/WindowOperatorContractTest.java | 310 +++++-------------- 5 files changed, 484 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index 8dfc717..951f661 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -99,16 +99,15 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> @Override public void processElement(StreamRecord<IN> element) throws Exception { - Collection<W> elementWindows = windowAssigner.assignWindows( - element.getValue(), - element.getTimestamp(), - windowAssignerContext); + final Collection<W> elementWindows = windowAssigner.assignWindows( + element.getValue(), element.getTimestamp(), windowAssignerContext); - @SuppressWarnings("unchecked") - final K key = (K) getKeyedStateBackend().getCurrentKey(); + //if element is handled by none of assigned elementWindows + boolean isSkippedElement = true; - if (windowAssigner instanceof MergingWindowAssigner) { + final K key = this.<K>getKeyedStateBackend().getCurrentKey(); + if (windowAssigner instanceof MergingWindowAssigner) { MergingWindowSet<W> mergingWindows = getMergingWindowSet(); for (W window : elementWindows) { @@ -138,11 +137,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> } }); - // check if the window is already inactive + // drop if the window is already late if (isWindowLate(actualWindow)) { mergingWindows.retireWindow(actualWindow); continue; } + isSkippedElement = false; W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { @@ -174,6 +174,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> registerCleanupTimer(actualWindow); } + // need to make sure to update the merging state in state mergingWindows.persist(); } else { for (W window : elementWindows) { @@ -182,6 +183,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> if (isWindowLate(window)) { continue; } + isSkippedElement = false; evictingWindowState.setCurrentNamespace(window); evictingWindowState.add(element); @@ -208,6 +210,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> registerCleanupTimer(window); } } + + // side output input event if + // element not handled by any window + // late arriving tag has been set + // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp + if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) { + sideOutput(element); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 9ce1ae7..b4283d8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -138,7 +138,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will * be emitted to this. */ - private final OutputTag<IN> lateDataOutputTag; + protected final OutputTag<IN> lateDataOutputTag; // ------------------------------------------------------------------------ // State that is not checkpointed @@ -574,7 +574,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> * * @param element skipped late arriving element to side output */ - private void sideOutput(StreamRecord<IN> element){ + protected void sideOutput(StreamRecord<IN> element){ output.collect(lateDataOutputTag, element); } http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java new file mode 100644 index 0000000..7af4506 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.evictors.CountEvictor; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; + +/** + * These tests verify that {@link EvictingWindowOperator} correctly interacts with the other + * windowing components: {@link WindowAssigner}, + * {@link Trigger}. + * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. + * + * <p>These tests document the implicit contract that exists between the windowing components. + */ +public class EvictingWindowOperatorContractTest extends WindowOperatorContractTest { + + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction, + OutputTag<Integer> lateOutputTag) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + ListStateDescriptor<StreamRecord<Integer>> intListDescriptor = + new ListStateDescriptor<>( + "int-list", + (TypeSerializer<StreamRecord<Integer>>) new StreamElementSerializer(IntSerializer.INSTANCE)); + + @SuppressWarnings("unchecked") + EvictingWindowOperator<Integer, Integer, OUT, W> operator = new EvictingWindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + intListDescriptor, + windowFunction, + trigger, + CountEvictor.<W>of(100), + allowedLatenss, + lateOutputTag); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception { + + return createWindowOperator( + assigner, + trigger, + allowedLatenss, + windowFunction, + null /* late output tag */); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java new file mode 100644 index 0000000..11508c5 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.runtime.operators.windowing; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.state.AppendingState; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; +import org.apache.flink.streaming.api.windowing.triggers.Trigger; +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; +import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.api.windowing.windows.Window; +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.util.OutputTag; +import org.junit.Test; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * These tests verify that {@link WindowOperator} correctly interacts with the other windowing + * components: {@link WindowAssigner}, + * {@link Trigger}. + * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. + * + * <p>These tests document the implicit contract that exists between the windowing components. + */ +public class RegularWindowOperatorContractTest extends WindowOperatorContractTest { + + @Test + public void testReducingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + + ReducingStateDescriptor<Integer> intReduceSumDescriptor = + new ReducingStateDescriptor<>( + "int-reduce", + new ReduceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer reduce(Integer a, Integer b) throws Exception { + return a + b; + } + }, + IntSerializer.INSTANCE); + + final ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE); + + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + @Test + public void testFoldingWindow() throws Exception { + + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); + Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); + InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); + + FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor = + new FoldingStateDescriptor<>( + "int-fold", + 0, + new FoldFunction<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; + } + }, + IntSerializer.INSTANCE); + + final ValueStateDescriptor<String> valueStateDescriptor = + new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE); + + KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = + createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction); + + testHarness.open(); + + when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) + .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); + + assertEquals(0, testHarness.getOutput().size()); + assertEquals(0, testHarness.numKeyedStateEntries()); + + // insert two elements without firing + testHarness.processElement(new StreamRecord<>(1, 0L)); + testHarness.processElement(new StreamRecord<>(1, 0L)); + + doAnswer(new Answer<TriggerResult>() { + @Override + public TriggerResult answer(InvocationOnMock invocation) throws Exception { + TimeWindow window = (TimeWindow) invocation.getArguments()[2]; + Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; + context.registerEventTimeTimer(window.getEnd()); + context.getPartitionedState(valueStateDescriptor).update("hello"); + return TriggerResult.FIRE; + } + }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + + testHarness.processElement(new StreamRecord<>(1, 0L)); + + verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); + + // clear is only called at cleanup time/GC time + verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); + + // FIRE should not purge contents + assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state + assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers + } + + /** + * Special method for creating a {@link WindowOperator} with a custom {@link StateDescriptor} + * for the window contents state. + */ + private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, + InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + @SuppressWarnings("unchecked") + WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + stateDescriptor, + windowFunction, + trigger, + allowedLatenss, + null /* late output tag */); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + @Override + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction, + OutputTag<Integer> lateOutputTag) throws Exception { + + KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }; + + ListStateDescriptor<Integer> intListDescriptor = + new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); + + + @SuppressWarnings("unchecked") + WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> operator = new WindowOperator<>( + assigner, + assigner.getWindowSerializer(new ExecutionConfig()), + keySelector, + IntSerializer.INSTANCE, + intListDescriptor, + windowFunction, + trigger, + allowedLatenss, + lateOutputTag); + + return new KeyedOneInputStreamOperatorTestHarness<>( + operator, + keySelector, + BasicTypeInfo.INT_TYPE_INFO); + } + + @Override + protected <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + WindowAssigner<Integer, W> assigner, + Trigger<Integer, W> trigger, + long allowedLatenss, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception { + + return createWindowOperator( + assigner, + trigger, + allowedLatenss, + windowFunction, + null /* late output tag */); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c4b1565/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index c542b43..abc7b3e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -18,20 +18,32 @@ package org.apache.flink.streaming.runtime.operators.windowing; +import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.AppendingState; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -42,55 +54,38 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.apache.flink.util.TestLogger; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.*; - /** - * These tests verify that {@link WindowOperator} correctly interacts with the other windowing + * Base for window operator tests that verify correct interaction with the other windowing * components: {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}, * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}. * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} and window state. * * <p>These tests document the implicit contract that exists between the windowing components. */ -public class WindowOperatorContractTest extends TestLogger { +public abstract class WindowOperatorContractTest extends TestLogger { + + @Rule + public ExpectedException expectedException = ExpectedException.none(); private static ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string-state", StringSerializer.INSTANCE, null); - private static ListStateDescriptor<Integer> intListDescriptor = - new ListStateDescriptor<>("int-list", IntSerializer.INSTANCE); - - private static ReducingStateDescriptor<Integer> intReduceSumDescriptor = - new ReducingStateDescriptor<>("int-reduce", new Sum(), IntSerializer.INSTANCE); - - private static FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor = - new FoldingStateDescriptor<>("int-fold", 0, new FoldSum(), IntSerializer.INSTANCE); - static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction() throws Exception { @SuppressWarnings("unchecked") InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = mock(InternalWindowFunction.class); @@ -313,7 +308,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag); testHarness.open(); @@ -338,7 +333,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction, lateOutputTag); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction, lateOutputTag); testHarness.open(); @@ -376,7 +371,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -401,7 +396,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -425,7 +420,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -472,7 +467,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); OneInputStreamOperatorTestHarness<Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -488,100 +483,6 @@ public class WindowOperatorContractTest extends TestLogger { } @Test - public void testReducingWindow() throws Exception { - - WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); - Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); - InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); - - KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intReduceSumDescriptor, mockWindowFunction); - - testHarness.open(); - - when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) - .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); - - assertEquals(0, testHarness.getOutput().size()); - assertEquals(0, testHarness.numKeyedStateEntries()); - - // insert two elements without firing - testHarness.processElement(new StreamRecord<>(1, 0L)); - testHarness.processElement(new StreamRecord<>(1, 0L)); - - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - TimeWindow window = (TimeWindow) invocation.getArguments()[2]; - Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; - context.registerEventTimeTimer(window.getEnd()); - context.getPartitionedState(valueStateDescriptor).update("hello"); - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); - - testHarness.processElement(new StreamRecord<>(1, 0L)); - - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - - // clear is only called at cleanup time/GC time - verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); - - // FIRE should not purge contents - assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state - assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers - } - - @Test - public void testFoldingWindow() throws Exception { - - WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); - Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); - InternalWindowFunction<Integer, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); - - KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intFoldSumDescriptor, mockWindowFunction); - - testHarness.open(); - - when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) - .thenReturn(Arrays.asList(new TimeWindow(2, 4), new TimeWindow(0, 2))); - - assertEquals(0, testHarness.getOutput().size()); - assertEquals(0, testHarness.numKeyedStateEntries()); - - // insert two elements without firing - testHarness.processElement(new StreamRecord<>(1, 0L)); - testHarness.processElement(new StreamRecord<>(1, 0L)); - - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - TimeWindow window = (TimeWindow) invocation.getArguments()[2]; - Trigger.TriggerContext context = (Trigger.TriggerContext) invocation.getArguments()[3]; - context.registerEventTimeTimer(window.getEnd()); - context.getPartitionedState(valueStateDescriptor).update("hello"); - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); - - testHarness.processElement(new StreamRecord<>(1, 0L)); - - verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector()); - - // clear is only called at cleanup time/GC time - verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); - - // FIRE should not purge contents - assertEquals(4, testHarness.numKeyedStateEntries()); // window contents plus trigger state - assertEquals(4, testHarness.numEventTimeTimers()); // window timers/gc timers - } - - @Test public void testEmittingFromWindowFunction() throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); @@ -589,7 +490,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -640,7 +541,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, String, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, String> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -683,7 +584,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -725,7 +626,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -768,7 +669,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -813,7 +714,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -866,7 +767,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -924,7 +825,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -986,7 +887,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1047,7 +948,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1115,7 +1016,7 @@ public class WindowOperatorContractTest extends TestLogger { mock(InternalWindowFunction.class); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1179,7 +1080,7 @@ public class WindowOperatorContractTest extends TestLogger { mock(InternalWindowFunction.class); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1243,7 +1144,7 @@ public class WindowOperatorContractTest extends TestLogger { mock(InternalWindowFunction.class); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, List<Integer>> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1313,7 +1214,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1360,7 +1261,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1418,7 +1319,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1459,7 +1360,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1501,7 +1402,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1598,7 +1499,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1681,7 +1582,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1725,7 +1626,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1773,7 +1674,7 @@ public class WindowOperatorContractTest extends TestLogger { assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp()); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1798,7 +1699,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1827,7 +1728,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1873,7 +1774,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -1918,7 +1819,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -1974,7 +1875,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2044,7 +1945,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2115,7 +2016,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2181,7 +2082,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2209,7 +2110,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2248,7 +2149,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); testHarness.open(); @@ -2287,7 +2188,7 @@ public class WindowOperatorContractTest extends TestLogger { InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.open(); @@ -2332,11 +2233,11 @@ public class WindowOperatorContractTest extends TestLogger { }).when(mockTrigger).clear(anyTimeWindow(), anyTriggerContext()); // only fire on the timestamp==0L timers, not the gc timers - when(mockTrigger.onEventTime(eq(0L), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + when(mockTrigger.onEventTime(eq(0L), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); mockWindowFunction = mockWindowFunction(); - testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, intListDescriptor, mockWindowFunction); + testHarness = createWindowOperator(mockAssigner, mockTrigger, 0L, mockWindowFunction); testHarness.setup(); testHarness.initializeState(snapshot); @@ -2367,76 +2268,19 @@ public class WindowOperatorContractTest extends TestLogger { assertEquals(0, testHarness.numEventTimeTimers()); } - private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, - long allowedLatenss, - StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, - InternalWindowFunction<ACC, OUT, Integer, W> windowFunction, - OutputTag<Integer> lateOutputTag) throws Exception { - - KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() { - private static final long serialVersionUID = 1L; + long allowedLateness, + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction, + OutputTag<Integer> lateOutputTag) throws Exception; - @Override - public Integer getKey(Integer value) throws Exception { - return value; - } - }; - - @SuppressWarnings("unchecked") - WindowOperator<Integer, Integer, ACC, OUT, W> operator = new WindowOperator<>( - assigner, - assigner.getWindowSerializer(new ExecutionConfig()), - keySelector, - IntSerializer.INSTANCE, - stateDescriptor, - windowFunction, - trigger, - allowedLatenss, - lateOutputTag); - - return new KeyedOneInputStreamOperatorTestHarness<>( - operator, - keySelector, - BasicTypeInfo.INT_TYPE_INFO); - } - - private <W extends Window, ACC, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( + protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator( WindowAssigner<Integer, W> assigner, Trigger<Integer, W> trigger, long allowedLatenss, - StateDescriptor<? extends AppendingState<Integer, ACC>, ?> stateDescriptor, - InternalWindowFunction<ACC, OUT, Integer, W> windowFunction) throws Exception { - return createWindowOperator( - assigner, - trigger, - allowedLatenss, - stateDescriptor, - windowFunction, - null /* late output tag */); - } - + InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> windowFunction) throws Exception; - private static class Sum implements ReduceFunction<Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer reduce(Integer value1, Integer value2) throws Exception { - return value1 + value2; - } - } - - private static class FoldSum implements FoldFunction<Integer, Integer> { - private static final long serialVersionUID = 1L; - - @Override - public Integer fold( - Integer accumulator, - Integer value) throws Exception { - return accumulator + value; - } - } private interface TimeDomainAdaptor {
