Repository: flink
Updated Branches:
  refs/heads/release-1.2 7fbb115bc -> b99883430


[hotfix] Add EvictingWindowOperatorContractTest

The existing WindowOperatorContractTest is turned into a test base while
RegularWindowOperatorContract test tests WindowOperator and
EvictingWindowOperatorTest tests EvictingWindowOperator. For this to
work, the base tests now always use List windows and we have specific
tests for reducing/folding windows in RegularWindowOperatorContractTest.

This also patches in the missing side output support for
EvictingWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e27b6971
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e27b6971
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e27b6971

Branch: refs/heads/release-1.2
Commit: e27b69710c30ee44b0da6c6debadd5bef8d72c07
Parents: 7fbb115
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Mar 21 15:00:24 2017 +0100
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Mar 24 12:24:51 2017 +0800

----------------------------------------------------------------------
 .../windowing/EvictingWindowOperator.java       |  12 +-
 .../EvictingWindowOperatorContractTest.java     |  83 +++++
 .../RegularWindowOperatorContractTest.java      | 269 ++++++++++++++
 .../windowing/WindowOperatorContractTest.java   | 363 ++++++++-----------
 4 files changed, 504 insertions(+), 223 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 17b3984..2d28c00 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -85,15 +85,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
        @Override
        @SuppressWarnings("unchecked")
        public void processElement(StreamRecord<IN> element) throws Exception {
-               Collection<W> elementWindows = windowAssigner.assignWindows(
-                               element.getValue(),
-                               element.getTimestamp(),
-                               windowAssignerContext);
+               final Collection<W> elementWindows = 
windowAssigner.assignWindows(
+                               element.getValue(), element.getTimestamp(), 
windowAssignerContext);
 
-               final K key = (K) getKeyedStateBackend().getCurrentKey();
+               final K key = this.<K>getKeyedStateBackend().getCurrentKey();
 
                if (windowAssigner instanceof MergingWindowAssigner) {
-
                        MergingWindowSet<W> mergingWindows = 
getMergingWindowSet();
 
                        for (W window : elementWindows) {
@@ -127,7 +124,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                                        }
                                                });
 
-                               // check if the window is already inactive
+                               // drop if the window is already late
                                if (isLate(actualWindow)) {
                                        
mergingWindows.retireWindow(actualWindow);
                                        continue;
@@ -163,6 +160,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                registerCleanupTimer(actualWindow);
                        }
 
+                       // need to make sure to update the merging state in 
state
                        mergingWindows.persist();
                } else {
                        for (W window : elementWindows) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
new file mode 100644
index 0000000..50c5950
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorContractTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+
+/**
+ * These tests verify that {@link EvictingWindowOperator} correctly interacts 
with the other
+ * windowing components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} 
and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the 
windowing components.
+ */
+public class EvictingWindowOperatorContractTest extends 
WindowOperatorContractTest {
+
+       @Override
+       protected <W extends Window, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
+               WindowAssigner<Integer, W> assigner,
+               Trigger<Integer, W> trigger,
+               long allowedLatenss,
+               InternalWindowFunction<Iterable<Integer>, OUT, Integer, W> 
windowFunction) throws Exception {
+
+               KeySelector<Integer, Integer> keySelector = new 
KeySelector<Integer, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer getKey(Integer value) throws Exception {
+                               return value;
+                       }
+               };
+
+               ListStateDescriptor<StreamRecord<Integer>> intListDescriptor =
+                       new ListStateDescriptor<>(
+                               "int-list",
+                               (TypeSerializer<StreamRecord<Integer>>) new 
StreamElementSerializer(IntSerializer.INSTANCE));
+
+               @SuppressWarnings("unchecked")
+               EvictingWindowOperator<Integer, Integer, OUT, W> operator = new 
EvictingWindowOperator<>(
+                       assigner,
+                       assigner.getWindowSerializer(new ExecutionConfig()),
+                       keySelector,
+                       IntSerializer.INSTANCE,
+                       intListDescriptor,
+                       windowFunction,
+                       trigger,
+                       CountEvictor.<W>of(100),
+                       allowedLatenss);
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                       operator,
+                       keySelector,
+                       BasicTypeInfo.INT_TYPE_INFO);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
new file mode 100644
index 0000000..13b68c0
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * These tests verify that {@link WindowOperator} correctly interacts with the 
other windowing
+ * components: {@link WindowAssigner},
+ * {@link Trigger}.
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} 
and window state.
+ *
+ * <p>These tests document the implicit contract that exists between the 
windowing components.
+ */
+public class RegularWindowOperatorContractTest extends 
WindowOperatorContractTest {
+
+       @Test
+       public void testReducingWindow() throws Exception {
+
+               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Integer, Void, Integer, TimeWindow> 
mockWindowFunction = mockWindowFunction();
+
+
+               ReducingStateDescriptor<Integer> intReduceSumDescriptor =
+                               new ReducingStateDescriptor<>(
+                                               "int-reduce",
+                                               new ReduceFunction<Integer>() {
+                                                       private static final 
long serialVersionUID = 1L;
+
+                                                       @Override
+                                                       public Integer 
reduce(Integer a, Integer b) throws Exception {
+                                                               return a + b;
+                                                       }
+                                               },
+                                               IntSerializer.INSTANCE);
+
+               final ValueStateDescriptor<String> valueStateDescriptor =
+                               new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE);
+
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intReduceSumDescriptor, mockWindowFunction);
+
+               testHarness.open();
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(2, 4), 
new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.getOutput().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               // insert two elements without firing
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+
+               doAnswer(new Answer<TriggerResult>() {
+                       @Override
+                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
+                               TimeWindow window = (TimeWindow) 
invocation.getArguments()[2];
+                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
+                               context.registerEventTimeTimer(window.getEnd());
+                               
context.getPartitionedState(valueStateDescriptor).update("hello");
+                               return TriggerResult.FIRE;
+                       }
+               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+
+               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+               // clear is only called at cleanup time/GC time
+               verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
+
+               // FIRE should not purge contents
+               assertEquals(4, testHarness.numKeyedStateEntries()); // window 
contents plus trigger state
+               assertEquals(4, testHarness.numEventTimeTimers()); // window 
timers/gc timers
+       }
+
+       @Test
+       public void testFoldingWindow() throws Exception {
+
+               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+               InternalWindowFunction<Integer, Void, Integer, TimeWindow> 
mockWindowFunction = mockWindowFunction();
+
+               FoldingStateDescriptor<Integer, Integer> intFoldSumDescriptor =
+                       new FoldingStateDescriptor<>(
+                                       "int-fold",
+                                       0,
+                                       new FoldFunction<Integer, Integer>() {
+                                               private static final long 
serialVersionUID = 1L;
+
+                                               @Override
+                                               public Integer fold(Integer 
accumulator, Integer value) throws Exception {
+                                                       return accumulator + 
value;
+                                               }
+                                       },
+                                       IntSerializer.INSTANCE);
+
+               final ValueStateDescriptor<String> valueStateDescriptor =
+                               new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE);
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intFoldSumDescriptor, mockWindowFunction);
+
+               testHarness.open();
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(2, 4), 
new TimeWindow(0, 2)));
+
+               assertEquals(0, testHarness.getOutput().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               // insert two elements without firing
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+
+               doAnswer(new Answer<TriggerResult>() {
+                       @Override
+                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
+                               TimeWindow window = (TimeWindow) 
invocation.getArguments()[2];
+                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
+                               context.registerEventTimeTimer(window.getEnd());
+                               
context.getPartitionedState(valueStateDescriptor).update("hello");
+                               return TriggerResult.FIRE;
+                       }
+               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+
+               testHarness.processElement(new StreamRecord<>(1, 0L));
+
+               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+
+               // clear is only called at cleanup time/GC time
+               verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
+
+               // FIRE should not purge contents
+               assertEquals(4, testHarness.numKeyedStateEntries()); // window 
contents plus trigger state
+               assertEquals(4, testHarness.numEventTimeTimers()); // window 
timers/gc timers
+       }
+
+       /**
+        * Special method for creating a {@link WindowOperator} with a custom 
{@link StateDescriptor}
+        * for the window contents state.
+        */
+       private <W extends Window, ACC, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
+                       WindowAssigner<Integer, W> assigner,
+                       Trigger<Integer, W> trigger,
+                       long allowedLatenss,
+                       StateDescriptor<? extends AppendingState<Integer, ACC>, 
?> stateDescriptor,
+                       InternalWindowFunction<ACC, OUT, Integer, W> 
windowFunction) throws Exception {
+
+               KeySelector<Integer, Integer> keySelector = new 
KeySelector<Integer, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer getKey(Integer value) throws Exception {
+                               return value;
+                       }
+               };
+
+               @SuppressWarnings("unchecked")
+               WindowOperator<Integer, Integer, ACC, OUT, W> operator = new 
WindowOperator<>(
+                               assigner,
+                               assigner.getWindowSerializer(new 
ExecutionConfig()),
+                               keySelector,
+                               IntSerializer.INSTANCE,
+                               stateDescriptor,
+                               windowFunction,
+                               trigger,
+                               allowedLatenss);
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                               operator,
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+       }
+
+       @Override
+       protected <W extends Window, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
+                       WindowAssigner<Integer, W> assigner,
+                       Trigger<Integer, W> trigger,
+                       long allowedLatenss,
+                       InternalWindowFunction<Iterable<Integer>, OUT, Integer, 
W> windowFunction) throws Exception {
+
+               KeySelector<Integer, Integer> keySelector = new 
KeySelector<Integer, Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public Integer getKey(Integer value) throws Exception {
+                               return value;
+                       }
+               };
+
+               ListStateDescriptor<Integer> intListDescriptor =
+                               new ListStateDescriptor<>("int-list", 
IntSerializer.INSTANCE);
+
+
+               @SuppressWarnings("unchecked")
+               WindowOperator<Integer, Integer, Iterable<Integer>, OUT, W> 
operator = new WindowOperator<>(
+                               assigner,
+                               assigner.getWindowSerializer(new 
ExecutionConfig()),
+                               keySelector,
+                               IntSerializer.INSTANCE,
+                               intListDescriptor,
+                               windowFunction,
+                               trigger,
+                               allowedLatenss);
+
+               return new KeyedOneInputStreamOperatorTestHarness<>(
+                               operator,
+                               keySelector,
+                               BasicTypeInfo.INT_TYPE_INFO);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e27b6971/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 47ead66..1d51b45 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -18,20 +18,32 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 
+import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.AppendingState;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -42,54 +54,37 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.mockito.verification.VerificationMode;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.*;
-
 /**
- * These tests verify that {@link WindowOperator} correctly interacts with the 
other windowing
+ * Base for window operator tests that verify correct interaction with the 
other windowing
  * components: {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner},
  * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
  * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} 
and window state.
  *
  * <p>These tests document the implicit contract that exists between the 
windowing components.
  */
-public class WindowOperatorContractTest extends TestLogger {
+public abstract class WindowOperatorContractTest extends TestLogger {
+
+       @Rule
+       public ExpectedException expectedException = ExpectedException.none();
 
        private static ValueStateDescriptor<String> valueStateDescriptor =
                        new ValueStateDescriptor<>("string-state", 
StringSerializer.INSTANCE, null);
 
-       private static ListStateDescriptor<Integer> intListDescriptor =
-                       new ListStateDescriptor<>("int-list", 
IntSerializer.INSTANCE);
-
-       private static ReducingStateDescriptor<Integer> intReduceSumDescriptor =
-                       new ReducingStateDescriptor<>("int-reduce", new Sum(), 
IntSerializer.INSTANCE);
-
-       private static FoldingStateDescriptor<Integer, Integer> 
intFoldSumDescriptor =
-                       new FoldingStateDescriptor<>("int-fold", 0, new 
FoldSum(), IntSerializer.INSTANCE);
-
        static <IN, OUT, KEY, W extends Window> InternalWindowFunction<IN, OUT, 
KEY, W> mockWindowFunction() throws Exception {
                @SuppressWarnings("unchecked")
                InternalWindowFunction<IN, OUT, KEY, W> mockWindowFunction = 
mock(InternalWindowFunction.class);
@@ -306,7 +301,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -331,7 +326,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -355,7 +350,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -402,7 +397,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                OneInputStreamOperatorTestHarness<Integer, Void> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -418,100 +413,6 @@ public class WindowOperatorContractTest extends 
TestLogger {
        }
 
        @Test
-       public void testReducingWindow() throws Exception {
-
-               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
-               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
-               InternalWindowFunction<Integer, Void, Integer, TimeWindow> 
mockWindowFunction = mockWindowFunction();
-
-               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intReduceSumDescriptor, mockWindowFunction);
-
-               testHarness.open();
-
-               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
-                               .thenReturn(Arrays.asList(new TimeWindow(2, 4), 
new TimeWindow(0, 2)));
-
-               assertEquals(0, testHarness.getOutput().size());
-               assertEquals(0, testHarness.numKeyedStateEntries());
-
-               // insert two elements without firing
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-
-               doAnswer(new Answer<TriggerResult>() {
-                       @Override
-                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
-                               TimeWindow window = (TimeWindow) 
invocation.getArguments()[2];
-                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
-                               context.registerEventTimeTimer(window.getEnd());
-                               
context.getPartitionedState(valueStateDescriptor).update("hello");
-                               return TriggerResult.FIRE;
-                       }
-               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
-
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-
-               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
-               // clear is only called at cleanup time/GC time
-               verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
-
-               // FIRE should not purge contents
-               assertEquals(4, testHarness.numKeyedStateEntries()); // window 
contents plus trigger state
-               assertEquals(4, testHarness.numEventTimeTimers()); // window 
timers/gc timers
-       }
-
-       @Test
-       public void testFoldingWindow() throws Exception {
-
-               WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
-               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
-               InternalWindowFunction<Integer, Void, Integer, TimeWindow> 
mockWindowFunction = mockWindowFunction();
-
-               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intFoldSumDescriptor, mockWindowFunction);
-
-               testHarness.open();
-
-               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
-                               .thenReturn(Arrays.asList(new TimeWindow(2, 4), 
new TimeWindow(0, 2)));
-
-               assertEquals(0, testHarness.getOutput().size());
-               assertEquals(0, testHarness.numKeyedStateEntries());
-
-               // insert two elements without firing
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-
-               doAnswer(new Answer<TriggerResult>() {
-                       @Override
-                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
-                               TimeWindow window = (TimeWindow) 
invocation.getArguments()[2];
-                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
-                               context.registerEventTimeTimer(window.getEnd());
-                               
context.getPartitionedState(valueStateDescriptor).update("hello");
-                               return TriggerResult.FIRE;
-                       }
-               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
-
-               testHarness.processElement(new StreamRecord<>(1, 0L));
-
-               verify(mockWindowFunction, times(2)).apply(eq(1), 
anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-               verify(mockWindowFunction, times(1)).apply(eq(1), eq(new 
TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
-
-               // clear is only called at cleanup time/GC time
-               verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
-
-               // FIRE should not purge contents
-               assertEquals(4, testHarness.numKeyedStateEntries()); // window 
contents plus trigger state
-               assertEquals(4, testHarness.numEventTimeTimers()); // window 
timers/gc timers
-       }
-
-       @Test
        public void testEmittingFromWindowFunction() throws Exception {
 
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
@@ -519,7 +420,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, String, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
String> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -570,7 +471,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, String, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
String> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -613,7 +514,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -655,7 +556,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -698,7 +599,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -743,7 +644,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -796,7 +697,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -854,7 +755,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -916,7 +817,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -977,7 +878,7 @@ public class WindowOperatorContractTest extends TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1045,7 +946,7 @@ public class WindowOperatorContractTest extends TestLogger 
{
                                mock(InternalWindowFunction.class);
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
List<Integer>> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1109,7 +1010,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                                mock(InternalWindowFunction.class);
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
List<Integer>> testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1148,6 +1049,83 @@ public class WindowOperatorContractTest extends 
TestLogger {
        }
 
        @Test
+       public void testNoEventTimeFiringForGarbageCollectedMergingWindow() 
throws Exception {
+               testNoTimerFiringForGarbageCollectedMergingWindow(new 
EventTimeAdaptor());
+       }
+
+       @Test
+       public void 
testNoProcessingTimeFiringForGarbageCollectedMergingWindow() throws Exception {
+               testNoTimerFiringForGarbageCollectedMergingWindow(new 
ProcessingTimeAdaptor());
+       }
+
+
+       /**
+        * Verify that we neither invoke the trigger nor the window function if 
a timer
+        * fires for a merging window that was already garbage collected.
+        */
+       public void testNoTimerFiringForGarbageCollectedMergingWindow(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
+
+               MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
+               timeAdaptor.setIsEventTime(mockAssigner);
+               Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+
+               @SuppressWarnings("unchecked")
+               InternalWindowFunction<Iterable<Integer>, List<Integer>, 
Integer, TimeWindow> mockWindowFunction =
+                               mock(InternalWindowFunction.class);
+
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
List<Integer>> testHarness =
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
+
+               testHarness.open();
+
+               timeAdaptor.advanceTime(testHarness, Long.MIN_VALUE);
+
+               when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
+                               .thenReturn(Arrays.asList(new TimeWindow(2, 
4)));
+
+               assertEquals(0, 
testHarness.extractOutputStreamRecords().size());
+               assertEquals(0, testHarness.numKeyedStateEntries());
+
+               doAnswer(new Answer<TriggerResult>() {
+                       @Override
+                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
+                               Trigger.TriggerContext context = 
(Trigger.TriggerContext) invocation.getArguments()[3];
+                               // set a timer for after the GC time
+                               timeAdaptor.registerTimer(context, 10L);
+                               return TriggerResult.CONTINUE;
+                       }
+               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+
+
+               testHarness.processElement(new StreamRecord<>(0, 0L));
+
+               assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
+               assertEquals(2, timeAdaptor.numTimers(testHarness)); // timer 
and gc timer
+
+               timeAdaptor.shouldContinueOnTime(mockTrigger);
+
+               // this should trigger GC
+               timeAdaptor.advanceTime(testHarness, 4L);
+
+               verify(mockTrigger, times(1)).clear(anyTimeWindow(), 
anyTriggerContext());
+
+               assertEquals(0, testHarness.numKeyedStateEntries());
+               // we still have a dangling timer because our trigger doesn't 
do cleanup
+               assertEquals(1, timeAdaptor.numTimers(testHarness));
+
+               timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, 
null);
+
+               verify(mockWindowFunction, never())
+                               .apply(anyInt(), anyTimeWindow(), 
anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+
+               // now we trigger the dangling timer
+               timeAdaptor.advanceTime(testHarness, 10L);
+
+               // we don't fire again
+               timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, 
null);
+       }
+
+       @Test
        public void testEventTimeTimerCreationAndDeletion() throws Exception {
                testTimerCreationAndDeletion(new EventTimeAdaptor());
        }
@@ -1166,7 +1144,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1213,7 +1191,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1271,7 +1249,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1312,7 +1290,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1354,7 +1332,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1451,7 +1429,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1534,7 +1512,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1578,7 +1556,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1626,7 +1604,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(Long.MAX_VALUE, GlobalWindow.get().maxTimestamp());
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1651,7 +1629,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1680,7 +1658,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1726,7 +1704,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1771,7 +1749,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1827,7 +1805,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1897,7 +1875,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -1968,7 +1946,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -2034,7 +2012,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -2062,7 +2040,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -2101,7 +2079,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
 
                testHarness.open();
 
@@ -2140,7 +2118,7 @@ public class WindowOperatorContractTest extends 
TestLogger {
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.open();
 
@@ -2185,11 +2163,11 @@ public class WindowOperatorContractTest extends 
TestLogger {
                }).when(mockTrigger).clear(anyTimeWindow(), 
anyTriggerContext());
 
                // only fire on the timestamp==0L timers, not the gc timers
-               when(mockTrigger.onEventTime(eq(0L), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               when(mockTrigger.onEventTime(eq(0L), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
 
                mockWindowFunction = mockWindowFunction();
 
-               testHarness = createWindowOperator(mockAssigner, mockTrigger, 
0L, intListDescriptor, mockWindowFunction);
+               testHarness = createWindowOperator(mockAssigner, mockTrigger, 
0L, mockWindowFunction);
 
                testHarness.setup();
                testHarness.initializeState(snapshot);
@@ -2220,59 +2198,12 @@ public class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(0, testHarness.numEventTimeTimers());
        }
 
-       private <W extends Window, ACC, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
+       protected abstract <W extends Window, OUT> 
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> 
createWindowOperator(
                        WindowAssigner<Integer, W> assigner,
                        Trigger<Integer, W> trigger,
                        long allowedLatenss,
-                       StateDescriptor<? extends AppendingState<Integer, ACC>, 
?> stateDescriptor,
-                       InternalWindowFunction<ACC, OUT, Integer, W> 
windowFunction) throws Exception {
-
-               KeySelector<Integer, Integer> keySelector = new 
KeySelector<Integer, Integer>() {
-                       private static final long serialVersionUID = 1L;
+                       InternalWindowFunction<Iterable<Integer>, OUT, Integer, 
W> windowFunction) throws Exception;
 
-                       @Override
-                       public Integer getKey(Integer value) throws Exception {
-                               return value;
-                       }
-               };
-
-               @SuppressWarnings("unchecked")
-               WindowOperator<Integer, Integer, ACC, OUT, W> operator = new 
WindowOperator<>(
-                               assigner,
-                               assigner.getWindowSerializer(new 
ExecutionConfig()),
-                               keySelector,
-                               IntSerializer.INSTANCE,
-                               stateDescriptor,
-                               windowFunction,
-                               trigger,
-                               allowedLatenss);
-
-               return new KeyedOneInputStreamOperatorTestHarness<>(
-                               operator,
-                               keySelector,
-                               BasicTypeInfo.INT_TYPE_INFO);
-       }
-
-
-       private static class Sum implements ReduceFunction<Integer> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Integer reduce(Integer value1, Integer value2) throws 
Exception {
-                       return value1 + value2;
-               }
-       }
-
-       private static class FoldSum implements FoldFunction<Integer, Integer> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public Integer fold(
-                               Integer accumulator,
-                               Integer value) throws Exception {
-                       return accumulator + value;
-               }
-       }
 
        private interface TimeDomainAdaptor {
 

Reply via email to