Repository: flink
Updated Branches:
  refs/heads/master 6b55e2ca3 -> d1475ee86


http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 6238e6c..104bc7b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -66,7 +66,6 @@ import 
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.WindowingTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
@@ -2460,174 +2459,4 @@ public class WindowOperatorTest extends TestLogger {
                        return "EventTimeTrigger()";
                }
        }
-
-       @Test
-       public void testEventTimeTumblingWindowsWithOffset() throws Exception {
-               final int WINDOW_SIZE = 2000;
-               final int OFFSET = 100;
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(OFFSET));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       EventTimeTrigger.create(),
-                       0);
-
-               // normal element
-               testHarness.processElement(new Tuple2<>("key2", 1), 1000);
-               testHarness.processWatermark(1985);
-
-               testHarness.addExpectedWatermark(1985);
-
-               testHarness.processElement(new Tuple2<>("key2", 2), 1980);
-               testHarness.processElement(new Tuple2<>("key2", 3), 1998);
-               testHarness.processElement(new Tuple2<>("key2", 4), 2001);
-
-               // verify that this does not yet fire our windows, as it would 
without offsets
-               testHarness.processWatermark(2010);
-               testHarness.addExpectedWatermark(2010);
-
-               testHarness.processWatermark(2999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 2), 1999 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 3), 1999 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 4), 1999 + 
OFFSET);
-
-               testHarness.addExpectedWatermark(2999);
-
-               testHarness.processWatermark(3999);
-               testHarness.addExpectedWatermark(3999);
-
-               testHarness.compareActualToExpectedOutput("Output is not 
correct");
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testEventTimeSlidingWindowsWithOffset() throws Exception {
-               final int WINDOW_SIZE = 2000;
-               final int SLIDE = 500;
-               final int OFFSET = 10;
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               SlidingEventTimeWindows windowAssigner = 
SlidingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE),Time.milliseconds(SLIDE),Time.milliseconds(OFFSET));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       EventTimeTrigger.create(),
-                       0);
-
-               testHarness.processElement(new Tuple2<>("key2", 1), 333);
-               testHarness.processWatermark(6666);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2",1),499 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2",1),999 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2",1),1499 + 
OFFSET);
-               testHarness.addExpectedElement(new Tuple2<>("key2",1),1999 + 
OFFSET);
-               testHarness.addExpectedWatermark(6666);
-               testHarness.compareActualToExpectedOutput("Output is not 
correct");
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testProcessingTimeTumblingWindowsWithOffset() throws 
Exception {
-               final int WINDOW_SIZE = 3000;
-               final int OFFSET = 1000;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               TumblingProcessingTimeWindows windowAssigner = 
TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
-                       Time.milliseconds(OFFSET));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       ProcessingTimeTrigger.create(),
-                       0);
-
-               testHarness.setProcessingTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new Tuple2<>("key2", 1), 
Long.MAX_VALUE);
-               testHarness.processElement(new Tuple2<>("key2", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key2", 1), 7000);
-
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-
-               testHarness.setProcessingTime(5000);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-
-               testHarness.setProcessingTime(7000);
-
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 6999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testProcessingTimeSlidingWindowsWithOffset() throws 
Exception {
-               final int WINDOW_SIZE = 3000;
-               final int SLIDING = 1000;
-               final int OFFSET = 10;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               SlidingProcessingTimeWindows windowAssigner = 
SlidingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE),
-                       Time.milliseconds(SLIDING),Time.milliseconds(OFFSET));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       ProcessingTimeTrigger.create(),
-                       0);
-
-               testHarness.setProcessingTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new Tuple2<>("key2", 1), 
Long.MAX_VALUE);
-
-               testHarness.setProcessingTime(1111);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET 
- 1);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET 
+ 999);
-
-               testHarness.processElement(new Tuple2<>("key2", 
2),Long.MIN_VALUE);
-               testHarness.setProcessingTime(2222);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), OFFSET 
+ 1999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 2), OFFSET 
+ 1999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.close();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
new file mode 100644
index 0000000..449d54b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowedValue.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Helper class for emitting a value along with the window information from
+ * a {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction}.
+ */
+public class WindowedValue<T, W extends Window> {
+  private final T value;
+  private final W window;
+
+  public WindowedValue(T value, W window) {
+    this.value = value;
+    this.window = window;
+  }
+
+  public T value() {
+    return value;
+  }
+
+  public W window() {
+    return window;
+  }
+
+  @Override
+  public String toString() {
+    return "WindowedValue(" + value + ", " + window + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
deleted file mode 100644
index 82c3d71..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowingTestHarnessTest.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.WindowingTestHarness;
-import org.junit.Test;
-
-public class WindowingTestHarnessTest {
-
-       @Test
-       public void testEventTimeTumblingWindows() throws Exception {
-               final int WINDOW_SIZE = 2000;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       EventTimeTrigger.create(),
-                       0);
-
-               // normal element
-               testHarness.processElement(new Tuple2<>("key2", 1), 1000);
-               testHarness.processWatermark(1985);
-
-               testHarness.addExpectedWatermark(1985);
-
-               // this will not be dropped because window.maxTimestamp() + 
allowedLateness > currentWatermark
-               testHarness.processElement(new Tuple2<>("key2", 1), 1980);
-
-               // dropped as late
-               testHarness.processElement(new Tuple2<>("key2", 1), 1998);
-
-               testHarness.processElement(new Tuple2<>("key2", 1), 2001);
-               testHarness.processWatermark(2999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 1999);
-               testHarness.addExpectedWatermark(2999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 3999);
-
-               testHarness.processWatermark(3999);
-               testHarness.addExpectedWatermark(3999);
-
-               testHarness.compareActualToExpectedOutput("Output is not 
correct");
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testProcessingTimeTumblingWindows() throws Exception {
-               final int WINDOW_SIZE = 3000;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               TumblingProcessingTimeWindows windowAssigner = 
TumblingProcessingTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       ProcessingTimeTrigger.create(),
-                       0);
-
-               testHarness.setProcessingTime(3);
-
-               // timestamp is ignored in processing time
-               testHarness.processElement(new Tuple2<>("key2", 1), 
Long.MAX_VALUE);
-               testHarness.processElement(new Tuple2<>("key2", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key2", 1), 7000);
-
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-
-               testHarness.setProcessingTime(5000);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-               testHarness.processElement(new Tuple2<>("key1", 1), 7000);
-
-               testHarness.setProcessingTime(7000);
-
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 5999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.close();
-       }
-
-       @Test
-       public void testSnapshotingAndRecovery() throws Exception {
-
-               final int WINDOW_SIZE = 3000;
-
-               TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
-
-               TumblingEventTimeWindows windowAssigner = 
TumblingEventTimeWindows.of(Time.milliseconds(WINDOW_SIZE));
-
-               WindowingTestHarness<String, Tuple2<String, Integer>, 
TimeWindow> testHarness = new WindowingTestHarness<>(
-                       windowAssigner,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       inputType,
-                       new TupleKeySelector(),
-                       EventTimeTrigger.create(),
-                       0);
-
-               // add elements out-of-order
-               testHarness.processElement(new Tuple2<>("key2", 1), 3999);
-               testHarness.processElement(new Tuple2<>("key2", 1), 3000);
-
-               testHarness.processElement(new Tuple2<>("key1", 1), 20);
-               testHarness.processElement(new Tuple2<>("key1", 1), 0);
-               testHarness.processElement(new Tuple2<>("key1", 1), 999);
-
-               testHarness.processElement(new Tuple2<>("key2", 1), 1998);
-               testHarness.processElement(new Tuple2<>("key2", 1), 1999);
-               testHarness.processElement(new Tuple2<>("key2", 1), 1000);
-
-               testHarness.processWatermark(999);
-               testHarness.addExpectedWatermark(999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processWatermark(1999);
-               testHarness.addExpectedWatermark(1999);
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               // do a snapshot, close and restore again
-               OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
-               testHarness.close();
-               testHarness.restore(snapshot);
-
-               testHarness.processWatermark(2999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key1", 1), 2999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 2999);
-
-               testHarness.addExpectedWatermark(2999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processWatermark(3999);
-               testHarness.addExpectedWatermark(3999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processWatermark(4999);
-               testHarness.addExpectedWatermark(4999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-               testHarness.processWatermark(5999);
-
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999);
-               testHarness.addExpectedElement(new Tuple2<>("key2", 1), 5999);
-               testHarness.addExpectedWatermark(5999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-
-
-               // those don't have any effect...
-               testHarness.processWatermark(6999);
-               testHarness.processWatermark(7999);
-
-               testHarness.addExpectedWatermark(6999);
-               testHarness.addExpectedWatermark(7999);
-
-               testHarness.compareActualToExpectedOutput("Output was not 
correct.");
-       }
-
-       private static class TupleKeySelector implements 
KeySelector<Tuple2<String, Integer>, String> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public String getKey(Tuple2<String, Integer> value) throws 
Exception {
-                       return value.f0;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7fe4ebc..568410a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -554,6 +556,24 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                return wasFailedExternally;
        }
 
+       @VisibleForTesting
+       public int numProcessingTimeTimers() {
+               if (operator instanceof AbstractStreamOperator) {
+                       return ((AbstractStreamOperator) 
operator).numProcessingTimeTimers();
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       @VisibleForTesting
+       public int numEventTimeTimers() {
+               if (operator instanceof AbstractStreamOperator) {
+                       return ((AbstractStreamOperator) 
operator).numEventTimeTimers();
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
        private class MockOutput implements Output<StreamRecord<OUT>> {
 
                private TypeSerializer<OUT> outputSerializer;

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 4abb6e2..cde5780 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -188,6 +189,22 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                return false;
        }
 
+       public int numKeyedStateEntries() {
+               if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+                       return ((HeapKeyedStateBackend) 
keyedStateBackend).numStateEntries();
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       public <N> int numKeyedStateEntries(N namespace) {
+               if (keyedStateBackend instanceof HeapKeyedStateBackend) {
+                       return ((HeapKeyedStateBackend) 
keyedStateBackend).numStateEntries(namespace);
+               } else {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
        @Override
        public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
                if (operatorStateHandles != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 58e8c6b..4b6925d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -36,26 +36,13 @@ import static org.junit.Assert.assertEquals;
  * Utils for working with the various test harnesses.
  */
 public class TestHarnessUtil {
-       /**
-        * Extracts the StreamRecords from the given output list.
-        */
-       @SuppressWarnings("unchecked")
-       public static <OUT> List<StreamRecord<OUT>> 
getStreamRecordsFromOutput(List<Object> output) {
-               List<StreamRecord<OUT>> resultElements = new 
LinkedList<StreamRecord<OUT>>();
-               for (Object e: output) {
-                       if (e instanceof StreamRecord) {
-                               resultElements.add((StreamRecord<OUT>) e);
-                       }
-               }
-               return resultElements;
-       }
 
        /**
         * Extracts the raw elements from the given output list.
         */
        @SuppressWarnings("unchecked")
        public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> 
output) {
-               List<OUT> resultElements = new LinkedList<OUT>();
+               List<OUT> resultElements = new LinkedList<>();
                for (Object e: output) {
                        if (e instanceof StreamRecord) {
                                resultElements.add(((StreamRecord<OUT>) 
e).getValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/d1475ee8/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
deleted file mode 100644
index efb0d7e..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
-import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * A utility class that facilitates the testing of custom {@link Trigger 
Triggers} and
- * {@link WindowAssigner WindowAssigners}.
- *
- * <p>For examples on how to use this class, see
- * {@link 
org.apache.flink.streaming.runtime.operators.windowing.WindowingTestHarnessTest}.
- *
- * <p>The input elements of type {@code IN} must implement the {@code 
equals()} method because
- * it is used to compare the expected output to the actual output.
- */
-public class WindowingTestHarness<K, IN, W extends Window> {
-
-       private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
-
-       private final ConcurrentLinkedQueue<Object> expectedOutputs = new 
ConcurrentLinkedQueue<>();
-
-       private volatile boolean isOpen = false;
-
-       public WindowingTestHarness(WindowAssigner<? super IN, W> 
windowAssigner,
-                                                               
TypeInformation<K> keyType,
-                                                               
TypeInformation<IN> inputType,
-                                                               KeySelector<IN, 
K> keySelector,
-                                                               Trigger<? super 
IN, ? super W> trigger,
-                                                               long 
allowedLateness) throws Exception {
-
-               ListStateDescriptor<IN> windowStateDesc =
-                               new ListStateDescriptor<>("window-contents", 
inputType.createSerializer(new ExecutionConfig()));
-
-               WindowOperator<K, IN, Iterable<IN>, IN, W> operator =
-                       new WindowOperator<>(
-                               windowAssigner,
-                               windowAssigner.getWindowSerializer(new 
ExecutionConfig()),
-                               keySelector,
-                               keyType.createSerializer(new ExecutionConfig()),
-                               windowStateDesc,
-                               new InternalIterableWindowFunction<>(new 
PassThroughFunction()),
-                               trigger,
-                               allowedLateness);
-
-               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
-       }
-
-       /**
-        * Simulates the processing of a new incoming element.
-        */
-       public void processElement(IN element, long timestamp) throws Exception 
{
-               openOperator();
-               testHarness.processElement(new StreamRecord<>(element, 
timestamp));
-       }
-
-       /**
-        * Simulates the processing of a new incoming watermark.
-        */
-       public void processWatermark(long timestamp) throws Exception {
-               openOperator();
-               testHarness.processWatermark(new Watermark(timestamp));
-       }
-
-       /**
-        * Sets the current processing time to {@code timestamp}.
-        * This is useful when working on processing time.
-        */
-       public void setProcessingTime(long timestamp) throws Exception {
-               openOperator();
-               testHarness.setProcessingTime(timestamp);
-       }
-
-       /**
-        * Gets the current output of the windowing operator, as produced by the
-        * synergies between the window assigner and the trigger. This will also
-        * contain the received watermarks.
-        */
-       public ConcurrentLinkedQueue<Object> getOutput() throws Exception {
-               return testHarness.getOutput();
-       }
-
-       /**
-        * Closes the testing window operator.
-        */
-       public void close() throws Exception {
-               if (isOpen) {
-                       testHarness.close();
-                       isOpen = false;
-               }
-       }
-
-       /**
-        * Adds a watermark to the expected output.
-        *
-        * <p>The expected output should contain the elements and watermarks 
that we expect the output of the operator to
-        * contain, in the correct order. This will be used to check if the 
produced output is the expected one, and
-        * thus determine the success or failure of the test.
-        */
-       public void addExpectedWatermark(long timestamp) {
-               expectedOutputs.add(new Watermark(timestamp));
-       }
-
-       /**
-        * Adds an element to the expected output.
-        *
-        * <p>The expected output should contain the elements and watermarks 
that we expect the output of the operator to
-        * contain, in the correct order. This will be used to check if the 
produced output is the expected one, and
-        * thus determine the success or failure of the test.
-        */
-       public void addExpectedElement(IN element, long timestamp) {
-               expectedOutputs.add(new StreamRecord<>(element, timestamp));
-       }
-
-       /**
-        * Compares the current produced output with the expected one. The 
latter contains elements and watermarks added
-        * using the {@link #addExpectedElement(Object, long)} and {@link 
#addExpectedWatermark(long)} methods.
-        *
-        * <p><b>NOTE:</b> This methods uses an {@code assert()} internally, 
thus failing the test if the {@code expected} output
-     * does not match the {@code actual} one.
-        */
-       public void compareActualToExpectedOutput(String errorMessage) {
-               TestHarnessUtil.assertOutputEqualsSorted(errorMessage, 
expectedOutputs, testHarness.getOutput(), new StreamRecordComparator());
-       }
-
-       /**
-        * Takes a snapshot of the current state of the operator. This can be 
used to test fault-tolerance.
-        */
-       public OperatorStateHandles snapshot(long checkpointId, long timestamp) 
throws Exception {
-               return testHarness.snapshot(checkpointId, timestamp);
-       }
-
-       /**
-        * Resumes execution from the provided {@link OperatorStateHandles}. 
This is used to test recovery after a failure.
-        */
-       public void restore(OperatorStateHandles stateHandles) throws Exception 
{
-               Preconditions.checkArgument(!isOpen,
-                       "You are trying to restore() while the operator is 
still open. " +
-                               "Please call close() first.");
-
-               testHarness.setup();
-               testHarness.initializeState(stateHandles);
-               openOperator();
-       }
-
-       private void openOperator() throws Exception {
-               if (!isOpen) {
-                       testHarness.open();
-                       isOpen = true;
-               }
-       }
-
-       private class PassThroughFunction implements WindowFunction<IN, IN, K, 
W> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void apply(K k, W window, Iterable<IN> input, 
Collector<IN> out) throws Exception {
-                       for (IN in: input) {
-                               out.collect(in);
-                       }
-               }
-       }
-
-       /**
-        * {@link Comparator} for sorting the expected and actual output by 
timestamp.
-        */
-       @SuppressWarnings("unchecked")
-       private class StreamRecordComparator implements Comparator<Object> {
-               @Override
-               public int compare(Object o1, Object o2) {
-                       if (o1 instanceof Watermark || o2 instanceof Watermark) 
{
-                               return 0;
-                       } else {
-                               StreamRecord<Tuple2<String, Integer>> sr0 = 
(StreamRecord<Tuple2<String, Integer>>) o1;
-                               StreamRecord<Tuple2<String, Integer>> sr1 = 
(StreamRecord<Tuple2<String, Integer>>) o2;
-                               if (sr0.getTimestamp() != sr1.getTimestamp()) {
-                                       return (int) (sr0.getTimestamp() - 
sr1.getTimestamp());
-                               }
-                               int comparison = 
sr0.getValue().f0.compareTo(sr1.getValue().f0);
-                               if (comparison != 0) {
-                                       return comparison;
-                               } else {
-                                       return sr0.getValue().f1 - 
sr1.getValue().f1;
-                               }
-                       }
-               }
-       }
-}

Reply via email to