[FLINK-4877] Refactor OperatorTestHarness to always use 
TestProcessingTimeService

Before, this would allow handing in a custom ProcessingTimeService but
this was in reality always TestProcessingTimeService.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30554758
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30554758
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30554758

Branch: refs/heads/master
Commit: 30554758897842ad851dc9b6e1758d452f7d702f
Parents: e112a63
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Sep 28 16:43:40 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |  18 +-
 .../fs/bucketing/BucketingSinkTest.java         |  69 ++-
 ...stampsAndPeriodicWatermarksOperatorTest.java |   8 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 355 ++++++--------
 ...AlignedProcessingTimeWindowOperatorTest.java | 475 ++++++-------------
 .../operators/windowing/CollectingOutput.java   |  86 ----
 .../operators/windowing/NoOpTimerService.java   |  52 --
 .../operators/windowing/WindowOperatorTest.java | 106 +----
 .../KeyedOneInputStreamOperatorTestHarness.java |  20 +-
 .../util/OneInputStreamOperatorTestHarness.java |  50 +-
 .../streaming/util/WindowingTestHarness.java    |  10 +-
 11 files changed, 384 insertions(+), 865 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 971d5f8..56d8efc 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -127,20 +126,21 @@ public class ContinuousFileMonitoringTest {
                ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
                reader.setOutputType(typeInfo, executionConfig);
 
-               final TestProcessingTimeService timeServiceProvider = new 
TestProcessingTimeService();
                final OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
-                       new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+                       new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig);
+
                tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
                tester.open();
 
                Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
 
                // test that watermarks are correctly emitted
 
-               timeServiceProvider.setCurrentTime(201);
-               timeServiceProvider.setCurrentTime(301);
-               timeServiceProvider.setCurrentTime(401);
-               timeServiceProvider.setCurrentTime(501);
+               tester.setProcessingTime(201);
+               tester.setProcessingTime(301);
+               tester.setProcessingTime(401);
+               tester.setProcessingTime(501);
 
                int i = 0;
                for(Object line: tester.getOutput()) {
@@ -170,8 +170,8 @@ public class ContinuousFileMonitoringTest {
                for(FileInputSplit split: splits) {
 
                        // set the next "current processing time".
-                       long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
-                       timeServiceProvider.setCurrentTime(nextTimestamp);
+                       long nextTimestamp = tester.getProcessingTime() + 
watermarkInterval;
+                       tester.setProcessingTime(nextTimestamp);
 
                        // send the next split to be read and wait until it is 
fully read.
                        tester.processElement(new StreamRecord<>(split));

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 0c0111c..f4b3cd7 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -35,8 +35,6 @@ import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.NetUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -70,7 +68,7 @@ public class BucketingSinkTest {
        private static org.apache.hadoop.fs.FileSystem dfs;
        private static String hdfsURI;
 
-       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir, TestTimeServiceProvider clock) {
+       private OneInputStreamOperatorTestHarness<String, Object> 
createTestSink(File dataDir) throws Exception {
                BucketingSink<String> sink = new 
BucketingSink<String>(dataDir.getAbsolutePath())
                        .setBucketer(new Bucketer<String>() {
                                private static final long serialVersionUID = 1L;
@@ -87,12 +85,12 @@ public class BucketingSinkTest {
                        .setInactiveBucketThreshold(5*60*1000L)
                        .setPendingSuffix(".pending");
 
-               return createTestSink(sink, clock);
+               return createTestSink(sink);
        }
 
-       private <T> OneInputStreamOperatorTestHarness<T, Object> 
createTestSink(BucketingSink<T> sink,
-                                                                               
                                                                        
TestTimeServiceProvider clock) {
-               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), new ExecutionConfig(), clock);
+       private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink(
+                       BucketingSink<T> sink) throws Exception {
+               return new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink), new ExecutionConfig());
        }
 
        @BeforeClass
@@ -121,10 +119,7 @@ public class BucketingSinkTest {
        public void testCheckpointWithoutNotify() throws Exception {
                File dataDir = tempFolder.newFolder();
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
-
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir);
 
                testHarness.setup();
                testHarness.open();
@@ -133,13 +128,13 @@ public class BucketingSinkTest {
                testHarness.processElement(new StreamRecord<>("Hello"));
                testHarness.processElement(new StreamRecord<>("Hello"));
 
-               clock.setCurrentTime(10000L);
+               testHarness.setProcessingTime(10000L);
 
                // snapshot but don't call notify to simulate a notify that 
never
                // arrives, the sink should move pending files in restore() in 
that case
                StreamStateHandle snapshot1 = testHarness.snapshotLegacy(0, 0);
 
-               testHarness = createTestSink(dataDir, clock);
+               testHarness = createTestSink(dataDir);
                testHarness.setup();
                testHarness.restore(snapshot1);
                testHarness.open();
@@ -175,16 +170,15 @@ public class BucketingSinkTest {
 
                final int numElements = 20;
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
-
                BucketingSink<String> sink = new BucketingSink<String>(outPath)
                        .setBucketer(new BasePathBucketer<String>())
                        .setPartPrefix("part")
                        .setPendingPrefix("")
                        .setPendingSuffix("");
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, clock);
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink);
+
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -217,9 +211,6 @@ public class BucketingSinkTest {
 
                final int numElements = 20;
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
-
                BucketingSink<Tuple2<IntWritable, Text>> sink = new 
BucketingSink<Tuple2<IntWritable, Text>>(outPath)
                        .setWriter(new SequenceFileWriter<IntWritable, Text>())
                        .setBucketer(new BasePathBucketer<Tuple2<IntWritable, 
Text>>())
@@ -230,7 +221,9 @@ public class BucketingSinkTest {
                sink.setInputType(TypeInformation.of(new 
TypeHint<Tuple2<IntWritable, Text>>(){}), new ExecutionConfig());
 
                OneInputStreamOperatorTestHarness<Tuple2<IntWritable, Text>, 
Object> testHarness =
-                       createTestSink(sink, clock);
+                       createTestSink(sink);
+
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -271,9 +264,6 @@ public class BucketingSinkTest {
 
                final int numElements = 20;
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
-
                Map<String, String> properties = new HashMap<>();
                Schema keySchema = Schema.create(Schema.Type.INT);
                Schema valueSchema = Schema.create(Schema.Type.STRING);
@@ -290,7 +280,9 @@ public class BucketingSinkTest {
                        .setPendingSuffix("");
 
                OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, 
Object> testHarness =
-                       createTestSink(sink, clock);
+                       createTestSink(sink);
+
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -325,8 +317,8 @@ public class BucketingSinkTest {
 
        /**
         * This uses {@link DateTimeBucketer} to
-        * produce rolling files. A custom {@link TimeServiceProvider} is set
-        * to simulate the advancing of time alongside the processing of 
elements.
+        * produce rolling files. We use {@link 
OneInputStreamOperatorTestHarness} to manually
+        * advance processing time.
         */
        @Test
        public void testDateTimeRollingStringWriter() throws Exception {
@@ -334,16 +326,15 @@ public class BucketingSinkTest {
 
                final String outPath = hdfsURI + "/rolling-out";
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
-
                BucketingSink<String> sink = new BucketingSink<String>(outPath)
                        .setBucketer(new DateTimeBucketer<String>("ss"))
                        .setPartPrefix("part")
                        .setPendingPrefix("")
                        .setPendingSuffix("");
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink, clock);
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(sink);
+
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -351,7 +342,7 @@ public class BucketingSinkTest {
                for (int i = 0; i < numElements; i++) {
                        // Every 5 elements, increase the clock time. We should 
end up with 5 elements per bucket.
                        if (i % 5 == 0) {
-                               clock.setCurrentTime(i * 1000L);
+                               testHarness.setProcessingTime(i * 1000L);
                        }
                        testHarness.processElement(new StreamRecord<>("message 
#" + Integer.toString(i)));
                }
@@ -427,10 +418,9 @@ public class BucketingSinkTest {
                final int numIds = 4;
                final int numElements = 20;
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir);
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -465,10 +455,9 @@ public class BucketingSinkTest {
                final int step2NumIds = 2;
                final int numElementsPerStep = 20;
 
-               TestTimeServiceProvider clock = new TestTimeServiceProvider();
-               clock.setCurrentTime(0L);
+               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir);
 
-               OneInputStreamOperatorTestHarness<String, Object> testHarness = 
createTestSink(dataDir, clock);
+               testHarness.setProcessingTime(0L);
 
                testHarness.setup();
                testHarness.open();
@@ -477,13 +466,13 @@ public class BucketingSinkTest {
                        testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step1NumIds)));
                }
 
-               clock.setCurrentTime(2*60*1000L);
+               testHarness.setProcessingTime(2*60*1000L);
 
                for (int i = 0; i < numElementsPerStep; i++) {
                        testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));
                }
 
-               clock.setCurrentTime(6*60*1000L);
+               testHarness.setProcessingTime(6*60*1000L);
 
                for (int i = 0; i < numElementsPerStep; i++) {
                        testHarness.processElement(new 
StreamRecord<>(Integer.toString(i % step2NumIds)));

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index af99d0d..febfcde 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -43,10 +43,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                final ExecutionConfig config = new ExecutionConfig();
                config.setAutoWatermarkInterval(50);
 
-               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
-               
                OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config, processingTimeService);
+                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config);
 
                long currentTime = 0;
 
@@ -77,7 +75,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                                        assertTrue(lastWatermark < 
nextElementValue);
                                } else {
                                        currentTime = currentTime + 10;
-                                       
processingTimeService.setCurrentTime(currentTime);
+                                       
testHarness.setProcessingTime(currentTime);
                                }
                        }
                        
@@ -109,7 +107,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                                        assertTrue(lastWatermark < 
nextElementValue);
                                } else {
                                        currentTime = currentTime + 10;
-                                       
processingTimeService.setCurrentTime(currentTime);
+                                       
testHarness.setProcessingTime(currentTime);
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 128c88b..720258e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -36,15 +36,10 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -59,7 +54,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -68,7 +62,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"serial", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings({"serial"})
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
@@ -183,45 +177,57 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
+
                try {
-                       @SuppressWarnings("unchecked")
-                       final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final ProcessingTimeService timerService = new 
NoOpTimerService();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       AccumulatingProcessingTimeWindowOperator<String, 
String, String> op =
+                                       new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+                                                       
StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
 
-                       AccumulatingProcessingTimeWindowOperator<String, 
String, String> op;
+                       KeyedOneInputStreamOperatorTestHarness<String, String, 
String> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+                       testHarness.open();
 
-                       op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-                                       StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 5000, 1000);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
                        assertTrue(op.getNextSlideTime() % 1000 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
 
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+                       testHarness.open();
+
                        assertTrue(op.getNextSlideTime() % 1000 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
+
 
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+                       testHarness.open();
+
                        assertTrue(op.getNextSlideTime() % 500 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
 
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
-                       assertTrue(op.getNextSlideTime() % 100 == 0);
-                       assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-                       op.dispose();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+                       testHarness.open();
+
+                       assertEquals(0, op.getNextSlideTime() % 100);
+                       assertEquals(0, op.getNextEvaluationTime() % 1100);
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -231,16 +237,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @Test
        public void testTumblingWindow() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
-
                try {
                        final int windowSize = 50;
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(windowSize);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -249,31 +247,23 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
-
-                       final int numElements = 1000;
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
-                       for (int i = 0; i < numElements; i++) {
-                               synchronized (lock) {
-                                       op.processElement(new 
StreamRecord<Integer>(i));
-                               }
-                               Thread.sleep(1);
-                       }
+                       testHarness.open();
 
-                       // get and verify the result
-                       out.waitForNElements(numElements, 60_000);
+                       final int numElements = 1000;
 
-                       timerService.quiesceAndAwaitPending();
+                       long currentTime = 0;
 
-                       synchronized (lock) {
-                               op.close();
+                       for (int i = 0; i < numElements; i++) {
+                               testHarness.processElement(new 
StreamRecord<>(i));
+                               currentTime = currentTime + 10;
+                               testHarness.setProcessingTime(currentTime);
                        }
 
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
 
-                       List<Integer> result = out.getElements();
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(numElements, result.size());
 
                        Collections.sort(result);
@@ -281,102 +271,70 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                assertEquals(i, result.get(i).intValue());
                        }
 
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testSlidingWindow() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
+               // tumbling window that triggers every 20 milliseconds
+               AccumulatingProcessingTimeWindowOperator<Integer, Integer, 
Integer> op =
+                               new AccumulatingProcessingTimeWindowOperator<>(
+                                               validatingIdentityFunction, 
identitySelector,
+                                               IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 150, 50);
 
-               try {
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
-                       
-                       // tumbling window that triggers every 20 milliseconds
-                       AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
-                                       new 
AccumulatingProcessingTimeWindowOperator<>(
-                                                       
validatingIdentityFunction, identitySelector,
-                                                       IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 150, 50);
+               KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer> testHarness =
+                               new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
+               testHarness.open();
 
-                       final int numElements = 1000;
-
-                       for (int i = 0; i < numElements; i++) {
-                               synchronized (lock) {
-                                       op.processElement(new 
StreamRecord<Integer>(i));
-                               }
-                               Thread.sleep(1);
-                       }
+               final int numElements = 1000;
 
-                       timerService.quiesceAndAwaitPending();
+               long currentTime = 0;
 
-                       synchronized (lock) {
-                               op.close();
-                       }
+               for (int i = 0; i < numElements; i++) {
+                       testHarness.processElement(new StreamRecord<>(i));
+                       currentTime = currentTime + 10;
+                       testHarness.setProcessingTime(currentTime);
+               }
 
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
+               // get and verify the result
+               List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-                       // get and verify the result
-                       List<Integer> result = out.getElements();
+               // if we kept this running, each element would be in the result 
three times (for each slide).
+               // we are closing the window before the final panes are through 
three times, so we may have less
+               // elements.
+               if (result.size() < numElements || result.size() > 3 * 
numElements) {
+                       fail("Wrong number of results: " + result.size());
+               }
 
-                       // if we kept this running, each element would be in 
the result three times (for each slide).
-                       // we are closing the window before the final panes are 
through three times, so we may have less
-                       // elements.
-                       if (result.size() < numElements || result.size() > 3 * 
numElements) {
-                               fail("Wrong number of results: " + 
result.size());
-                       }
+               Collections.sort(result);
+               int lastNum = -1;
+               int lastCount = -1;
 
-                       Collections.sort(result);
-                       int lastNum = -1;
-                       int lastCount = -1;
-                       
-                       for (int num : result) {
-                               if (num == lastNum) {
-                                       lastCount++;
-                                       assertTrue(lastCount <= 3);
-                               }
-                               else {
-                                       lastNum = num;
-                                       lastCount = 1;
-                               }
+               for (int num : result) {
+                       if (num == lastNum) {
+                               lastCount++;
+                               assertTrue(lastCount <= 3);
                        }
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
+                       else {
+                               lastNum = num;
+                               lastCount = 1;
                        }
-               } finally {
-                       timerService.shutdownService();
                }
+
+               testHarness.close();
        }
 
        @Test
        public void testTumblingWindowSingleElements() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -384,66 +342,46 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        
validatingIdentityFunction, identitySelector,
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
 
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
-                       synchronized (lock) {
-                               op.processElement(new StreamRecord<Integer>(1));
-                               op.processElement(new StreamRecord<Integer>(2));
-                       }
-                       out.waitForNElements(2, 60000);
+                       testHarness.open();
 
-                       synchronized (lock) {
-                               op.processElement(new StreamRecord<Integer>(3));
-                               op.processElement(new StreamRecord<Integer>(4));
-                               op.processElement(new StreamRecord<Integer>(5));
-                       }
-                       out.waitForNElements(5, 60000);
+                       testHarness.setProcessingTime(0);
 
-                       synchronized (lock) {
-                               op.processElement(new StreamRecord<Integer>(6));
-                       }
-                       out.waitForNElements(6, 60000);
-                       
-                       List<Integer> result = out.getElements();
-                       assertEquals(6, result.size());
+                       testHarness.processElement(new StreamRecord<>(1));
+                       testHarness.processElement(new StreamRecord<>(2));
 
-                       Collections.sort(result);
-                       assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+                       testHarness.setProcessingTime(50);
 
-                       timerService.quiesceAndAwaitPending();
+                       testHarness.processElement(new StreamRecord<>(3));
+                       testHarness.processElement(new StreamRecord<>(4));
+                       testHarness.processElement(new StreamRecord<>(5));
 
-                       synchronized (lock) {
-                               op.close();
-                       }
+                       testHarness.setProcessingTime(100);
 
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
+                       testHarness.processElement(new StreamRecord<>(6));
 
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
+                       testHarness.setProcessingTime(200);
+
+
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+                       assertEquals(6, result.size());
+
+                       Collections.sort(result);
+                       assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
+
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
        
        @Test
        public void testSlidingWindowSingleElements() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-               
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                       new ReferenceSettingExceptionHandler(error), lock);
-
                try {
-                       final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -451,44 +389,33 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        
validatingIdentityFunction, identitySelector,
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 150, 50);
 
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), out);
-                       op.open();
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Integer, Integer> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, identitySelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
-                       synchronized (lock) {
-                               op.processElement(new StreamRecord<Integer>(1));
-                               op.processElement(new StreamRecord<Integer>(2));
-                       }
+                       testHarness.setProcessingTime(0);
+
+                       testHarness.open();
+
+                       testHarness.processElement(new StreamRecord<>(1));
+                       testHarness.processElement(new StreamRecord<>(2));
+
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+
+                       List<Integer> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-                       // each element should end up in the output three times
-                       // wait until the elements have arrived 6 times in the 
output
-                       out.waitForNElements(6, 120000);
-                       
-                       List<Integer> result = out.getElements();
                        assertEquals(6, result.size());
                        
                        Collections.sort(result);
                        assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
 
-                       timerService.quiesceAndAwaitPending();
-
-                       synchronized (lock) {
-                               op.close();
-                       }
-
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
@@ -503,15 +430,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
                        testHarness.setup();
                        testHarness.open();
 
-                       timerService.setCurrentTime(0);
+                       testHarness.setProcessingTime(0);
 
                        // inject some elements
                        final int numElementsFirst = 700;
@@ -542,8 +467,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       timerService = new TestProcessingTimeService();
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -554,7 +478,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(new 
StreamRecord<>(i));
                        }
 
-                       timerService.setCurrentTime(400);
+                       testHarness.setProcessingTime(400);
 
                        // get and verify the result
                        List<Integer> finalResult = new ArrayList<>();
@@ -568,7 +492,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                assertEquals(i, finalResult.get(i).intValue());
                        }
                        testHarness.close();
-                       op.dispose();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -583,8 +506,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        // sliding window (200 msecs) every 50 msecs
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
                                        new 
AccumulatingProcessingTimeWindowOperator<>(
@@ -593,9 +514,9 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, 
windowSlide);
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
-                       timerService.setCurrentTime(0);
+                       testHarness.setProcessingTime(0);
 
                        testHarness.setup();
                        testHarness.open();
@@ -623,7 +544,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        }
 
                        testHarness.close();
-                       op.dispose();
 
                        // re-create the operator and restore the state
                        op = new AccumulatingProcessingTimeWindowOperator<>(
@@ -631,8 +551,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                        windowSize, windowSlide);
 
-                       timerService = new TestProcessingTimeService();
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -644,13 +563,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(new 
StreamRecord<>(i));
                        }
 
-                       timerService.setCurrentTime(50);
-                       timerService.setCurrentTime(100);
-                       timerService.setCurrentTime(150);
-                       timerService.setCurrentTime(200);
-                       timerService.setCurrentTime(250);
-                       timerService.setCurrentTime(300);
-                       timerService.setCurrentTime(350);
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+                       testHarness.setProcessingTime(200);
+                       testHarness.setProcessingTime(250);
+                       testHarness.setProcessingTime(300);
+                       testHarness.setProcessingTime(350);
 
                        // get and verify the result
                        List<Integer> finalResult = new 
ArrayList<>(resultAtSnapshot);
@@ -684,14 +603,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        new StatefulFunction(), 
identitySelector,
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
-                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), 
timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), 
identitySelector, BasicTypeInfo.INT_TYPE_INFO);
 
                        testHarness.open();
 
-                       timerService.setCurrentTime(0);
+                       testHarness.setProcessingTime(0);
 
                        testHarness.processElement(new StreamRecord<>(1));
                        testHarness.processElement(new StreamRecord<>(2));
@@ -703,7 +620,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        op.processElement(new StreamRecord<>(2));
                        op.processElement(new StreamRecord<>(2));
 
-                       timerService.setCurrentTime(1000);
+                       testHarness.setProcessingTime(1000);
 
                        List<Integer> result = 
extractFromStreamRecords(testHarness.getOutput());
                        assertEquals(8, result.size());
@@ -808,7 +725,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
-       private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+       private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
                List<T> result = new ArrayList<>();
                for (Object in : input) {
                        if (in instanceof StreamRecord) {
@@ -824,5 +741,5 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                while (!timers.isTerminated()) {
                        Thread.sleep(2);
                }
-       } 
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index bb64a08..7ca5753 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.ValueState;
@@ -32,24 +31,13 @@ import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-
 import org.junit.After;
 import org.junit.Test;
 
@@ -57,21 +45,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
-@SuppressWarnings({"serial", 
"SynchronizationOnLocalVariableOrMethodParameter"})
+@SuppressWarnings("serial")
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
@@ -79,23 +64,23 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        @SuppressWarnings("unchecked")
        private final KeySelector<String, String> mockKeySelector = 
mock(KeySelector.class);
-       
-       private final KeySelector<Tuple2<Integer, Integer>, Integer> 
fieldOneSelector = 
+
+       private final KeySelector<Tuple2<Integer, Integer>, Integer> 
fieldOneSelector =
                        new KeySelector<Tuple2<Integer,Integer>, Integer>() {
                                @Override
                                public Integer getKey(Tuple2<Integer,Integer> 
value) {
                                        return value.f0;
                                }
        };
-       
+
        private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = 
new ReduceFunction<Tuple2<Integer, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> 
value1, Tuple2<Integer, Integer> value2) {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                }
        };
-       
-       private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer 
= 
+
+       private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer =
                        new TupleTypeInfo<Tuple2<Integer, 
Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)
                                        .createSerializer(new 
ExecutionConfig());
 
@@ -107,14 +92,14 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        return diff0 != 0 ? diff0 : diff1;
                }
        };
-       
+
        // 
------------------------------------------------------------------------
 
        public AggregatingAlignedProcessingTimeWindowOperatorTest() {
                ClosureCleaner.clean(fieldOneSelector, false);
                ClosureCleaner.clean(sumFunction, false);
        }
-       
+
        // 
------------------------------------------------------------------------
 
        @After
@@ -131,9 +116,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                assertTrue("Not all trigger threads where properly shut down",
                                StreamTask.TRIGGER_THREAD_GROUP.activeCount() 
== 0);
        }
-       
+
        // 
------------------------------------------------------------------------
-       
+
        @Test
        public void testInvalidParameters() {
                try {
@@ -141,7 +126,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertInvalidParameter(10000L, -1L);
                        assertInvalidParameter(-1L, 1000L);
                        assertInvalidParameter(1000L, 2000L);
-                       
+
                        // actual internal slide is too low here:
                        assertInvalidParameter(1000L, 999L);
                }
@@ -150,12 +135,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
        }
-       
+
        @Test
        public void testWindowSizeAndSlide() {
                try {
                        AggregatingProcessingTimeWindowOperator<String, String> 
op;
-                       
+
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 5000, 1000);
                        assertEquals(5000, op.getWindowSize());
@@ -193,44 +178,51 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
                try {
-                       @SuppressWarnings("unchecked")
-                       final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final ProcessingTimeService timerService = new 
NoOpTimerService();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
-                       AggregatingProcessingTimeWindowOperator<String, String> 
op;
+                       AggregatingProcessingTimeWindowOperator<String, String> 
op =
+                                       new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
+                                               StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 5000, 1000);
+
+                       KeyedOneInputStreamOperatorTestHarness<String, String, 
String> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+                       testHarness.open();
 
-                       op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-                                       StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 5000, 1000);
-                       op.setup(mockTask, createTaskConfig(mockKeySelector, 
StringSerializer.INSTANCE, 10), mockOut);
-                       op.open();
                        assertTrue(op.getNextSlideTime() % 1000 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
 
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+                       testHarness.open();
+
                        assertTrue(op.getNextSlideTime() % 1000 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
 
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+                       testHarness.open();
+
                        assertTrue(op.getNextSlideTime() % 500 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-                       op.dispose();
+                       testHarness.close();
 
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
-                       op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
-                       op.open();
+
+                       testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, mockKeySelector, 
BasicTypeInfo.STRING_TYPE_INFO);
+                       testHarness.open();
+
                        assertTrue(op.getNextSlideTime() % 100 == 0);
                        assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-                       op.dispose();
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -240,85 +232,54 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        @Test
        public void testTumblingWindowUniqueElements() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final int windowSize = 50;
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
-                       
+
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        sumFunction, 
fieldOneSelector,
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, windowSize);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, 
BasicTypeInfo.INT_TYPE_INFO);
 
-                       op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
-                       op.open();
+                       testHarness.open();
 
                        final int numElements = 1000;
 
+                       long currentTime = 0;
+
                        for (int i = 0; i < numElements; i++) {
-                               synchronized (lock) {
-                                       StreamRecord<Tuple2<Integer, Integer>> 
next = new StreamRecord<>(new Tuple2<>(i, i));
-                                       op.setKeyContextElement1(next);
-                                       op.processElement(next);
-                               }
-                               Thread.sleep(1);
+                               StreamRecord<Tuple2<Integer, Integer>> next = 
new StreamRecord<>(new Tuple2<>(i, i));
+                               testHarness.processElement(next);
+                               currentTime = currentTime + 10;
+                               testHarness.setProcessingTime(currentTime);
                        }
 
-                       out.waitForNElements(numElements, 60_000);
-
                        // get and verify the result
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
+                       List<Tuple2<Integer, Integer>> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(numElements, result.size());
 
-                       timerService.quiesceAndAwaitPending();
-
-                       synchronized (lock) {
-                               op.close();
-                       }
-
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
-
+                       testHarness.close();
 
                        Collections.sort(result, tupleComparator);
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, result.get(i).f0.intValue());
                                assertEquals(i, result.get(i).f1.intValue());
                        }
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testTumblingWindowDuplicateElements() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
                try {
                        final int windowSize = 50;
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
-
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -326,43 +287,39 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, windowSize);
 
-                       op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
-                       op.open();
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.setProcessingTime(0);
+                       testHarness.open();
 
                        final int numWindows = 10;
 
                        long previousNextTime = 0;
                        int window = 1;
-                       
-                       while (window <= numWindows) {
-                               synchronized (lock) {
-                                       long nextTime = 
op.getNextEvaluationTime();
-                                       int val = ((int) nextTime) ^ ((int) 
(nextTime >>> 32));
-
-                                       StreamRecord<Tuple2<Integer, Integer>> 
next =  new StreamRecord<>(new Tuple2<>(val, val));
-                                       op.setKeyContextElement1(next);
-                                       op.processElement(next);
-
-                                       if (nextTime != previousNextTime) {
-                                               window++;
-                                               previousNextTime = nextTime;
-                                       }
-                               }
-                               Thread.sleep(1);
-                       }
 
-                       out.waitForNElements(numWindows, 60_000);
+                       long currentTime = 0;
 
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
+                       while (window <= numWindows) {
+                               long nextTime = op.getNextEvaluationTime();
+                               int val = ((int) nextTime) ^ ((int) (nextTime 
>>> 32));
 
-                       timerService.quiesceAndAwaitPending();
+                               StreamRecord<Tuple2<Integer, Integer>> next =  
new StreamRecord<>(new Tuple2<>(val, val));
+                               testHarness.processElement(next);
 
-                       synchronized (lock) {
-                               op.close();
+                               if (nextTime != previousNextTime) {
+                                       window++;
+                                       previousNextTime = nextTime;
+                               }
+                               currentTime = currentTime + 1;
+                               testHarness.setProcessingTime(currentTime);
                        }
 
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
+                       testHarness.setProcessingTime(currentTime + 100);
+
+                       List<Tuple2<Integer, Integer>> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
+
+                       testHarness.close();
 
                        // we have ideally one element per window. we may have 
more, when we emitted a value into the
                        // successive window (corner case), so we can have 
twice the number of elements, in the worst case.
@@ -371,33 +328,16 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        // deduplicate for more accurate checks
                        HashSet<Tuple2<Integer, Integer>> set = new 
HashSet<>(result);
                        assertTrue(set.size() == 10);
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testSlidingWindow() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
-
                try {
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
-
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
-
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -405,32 +345,27 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        150, 50);
 
-                       op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
-                       op.open();
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.open();
 
                        final int numElements = 1000;
 
+                       long currentTime = 0;
+
                        for (int i = 0; i < numElements; i++) {
-                               synchronized (lock) {
-                                       StreamRecord<Tuple2<Integer, Integer>> 
next = new StreamRecord<>(new Tuple2<>(i, i));
-                                       op.setKeyContextElement1(next);
-                                       op.processElement(next);
-                               }
-                               Thread.sleep(1);
+                               StreamRecord<Tuple2<Integer, Integer>> next = 
new StreamRecord<>(new Tuple2<>(i, i));
+                               testHarness.processElement(next);
+                               currentTime = currentTime + 1;
+                               testHarness.setProcessingTime(currentTime);
                        }
 
-                       timerService.quiesceAndAwaitPending();
-
-                       synchronized (lock) {
-                               op.close();
-                       }
+                       // get and verify the result
+                       List<Tuple2<Integer, Integer>> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
 
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
+                       testHarness.close();
 
-                       // get and verify the result
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
-                       
                        // every element can occur between one and three times
                        if (result.size() < numElements || result.size() > 3 * 
numElements) {
                                System.out.println(result);
@@ -440,10 +375,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        Collections.sort(result, tupleComparator);
                        int lastNum = -1;
                        int lastCount = -1;
-                       
+
                        for (Tuple2<Integer, Integer> val : result) {
                                assertEquals(val.f0, val.f1);
-                               
+
                                if (val.f0 == lastNum) {
                                        lastCount++;
                                        assertTrue(lastCount <= 3);
@@ -453,58 +388,42 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        lastCount = 1;
                                }
                        }
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testSlidingWindowSingleElements() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
-
                try {
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
-
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        sumFunction, 
fieldOneSelector,
                                                        IntSerializer.INSTANCE, 
tupleSerializer, 150, 50);
 
-                       op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
-                       op.open();
-
-                       synchronized (lock) {
-                               StreamRecord<Tuple2<Integer, Integer>> next1 = 
new StreamRecord<>(new Tuple2<>(1, 1));
-                               op.setKeyContextElement1(next1);
-                               op.processElement(next1);
-                               
-                               StreamRecord<Tuple2<Integer, Integer>> next2 = 
new StreamRecord<>(new Tuple2<>(2, 2));
-                               op.setKeyContextElement1(next2);
-                               op.processElement(next2);
-                       }
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.open();
+
+                       testHarness.setProcessingTime(0);
+
+                       StreamRecord<Tuple2<Integer, Integer>> next1 = new 
StreamRecord<>(new Tuple2<>(1, 1));
+                       testHarness.processElement(next1);
 
-                       // each element should end up in the output three times
-                       // wait until the elements have arrived 6 times in the 
output
-                       out.waitForNElements(6, 120000);
-                       
-                       List<Tuple2<Integer, Integer>> result = 
out.getElements();
+                       StreamRecord<Tuple2<Integer, Integer>> next2 = new 
StreamRecord<>(new Tuple2<>(2, 2));
+                       testHarness.processElement(next2);
+
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+
+                       List<Tuple2<Integer, Integer>> result = 
extractFromStreamRecords(testHarness.extractOutputStreamRecords());
                        assertEquals(6, result.size());
-                       
+
                        Collections.sort(result, tupleComparator);
                        assertEquals(Arrays.asList(
                                        new Tuple2<>(1, 1),
@@ -515,40 +434,18 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        new Tuple2<>(2, 2)
                        ), result);
 
-                       timerService.quiesceAndAwaitPending();
-
-                       synchronized (lock) {
-                               op.close();
-                       }
-
-                       shutdownTimerServiceAndWait(timerService);
-                       op.dispose();
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
+                       testHarness.close();
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testPropagateExceptionsFromProcessElement() throws 
Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> error = new 
AtomicReference<>();
-
-               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
-                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
-                       final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
-
                        ReduceFunction<Tuple2<Integer, Integer>> 
failingFunction = new FailingFunction(100);
 
                        // the operator has a window time that is so long that 
it will not fire in this test
@@ -559,46 +456,31 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        hundredYears, 
hundredYears);
 
-                       op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
-                       op.open();
+                       KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness =
+                                       new 
KeyedOneInputStreamOperatorTestHarness<>(op, fieldOneSelector, 
BasicTypeInfo.INT_TYPE_INFO);
+
+                       testHarness.open();
 
                        for (int i = 0; i < 100; i++) {
-                               synchronized (lock) {
-                                       StreamRecord<Tuple2<Integer, Integer>> 
next = new StreamRecord<>(new Tuple2<>(1, 1));
-                                       op.setKeyContextElement1(next);
-                                       op.processElement(next);
-                               }
+                               StreamRecord<Tuple2<Integer, Integer>> next = 
new StreamRecord<>(new Tuple2<>(1, 1));
+                               testHarness.processElement(next);
                        }
-                       
+
                        try {
                                StreamRecord<Tuple2<Integer, Integer>> next = 
new StreamRecord<>(new Tuple2<>(1, 1));
-                               op.setKeyContextElement1(next);
-                               op.processElement(next);
+                               testHarness.processElement(next);
                                fail("This fail with an exception");
                        }
                        catch (Exception e) {
                                assertTrue(e.getMessage().contains("Artificial 
Test Exception"));
                        }
 
-                       timerService.quiesceAndAwaitPending();
-                       synchronized (lock) {
-                               op.close();
-                       }
-
-                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
-
-                       if (error.get() != null) {
-                               throw new Exception(error.get());
-                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
@@ -606,8 +488,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        final int windowSize = 200;
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        // tumbling window that triggers every 50 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -616,9 +496,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        windowSize, windowSize);
 
                        OneInputStreamOperatorTestHarness<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
-                       timerService.setCurrentTime(0);
+                       testHarness.setProcessingTime(0);
 
                        testHarness.setup();
                        testHarness.open();
@@ -626,7 +506,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        // inject some elements
                        final int numElementsFirst = 700;
                        final int numElements = 1000;
-                       
+
                        for (int i = 0; i < numElementsFirst; i++) {
                                StreamRecord<Tuple2<Integer, Integer>> next = 
new StreamRecord<>(new Tuple2<>(i, i));
                                testHarness.processElement(next);
@@ -656,8 +536,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSize);
 
-                       timerService = new TestProcessingTimeService();
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -669,7 +548,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next);
                        }
 
-                       timerService.setCurrentTime(200);
+                       testHarness.setProcessingTime(200);
 
                        // get and verify the result
                        List<Tuple2<Integer, Integer>> finalResult = new 
ArrayList<>(resultAtSnapshot);
@@ -699,8 +578,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        // sliding window (200 msecs) every 50 msecs
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -708,10 +585,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, 
windowSlide);
 
-                       timerService.setCurrentTime(0);
 
                        OneInputStreamOperatorTestHarness<Tuple2<Integer, 
Integer>, Tuple2<Integer, Integer>> testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
+
+                       testHarness.setProcessingTime(0);
 
                        testHarness.setup();
                        testHarness.open();
@@ -749,8 +627,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSlide);
 
-                       timerService = new TestProcessingTimeService();
-                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig());
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -762,14 +639,14 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next);
                        }
 
-                       timerService.setCurrentTime(50);
-                       timerService.setCurrentTime(100);
-                       timerService.setCurrentTime(150);
-                       timerService.setCurrentTime(200);
-                       timerService.setCurrentTime(250);
-                       timerService.setCurrentTime(300);
-                       timerService.setCurrentTime(350);
-                       timerService.setCurrentTime(400);
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+                       testHarness.setProcessingTime(200);
+                       testHarness.setProcessingTime(250);
+                       testHarness.setProcessingTime(300);
+                       testHarness.setProcessingTime(350);
+                       testHarness.setProcessingTime(400);
 
                        // get and verify the result
                        List<Tuple2<Integer, Integer>> finalResult = new 
ArrayList<>(resultAtSnapshot);
@@ -796,11 +673,9 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        public void testKeyValueStateInWindowFunctionTumbling() {
                try {
                        final long twoSeconds = 2000;
-                       
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        StatefulFunction.globalCounts.clear();
-                       
+
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        new StatefulFunction(), 
fieldOneSelector,
@@ -809,16 +684,15 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(
                                        op,
                                        new ExecutionConfig(),
-                                       timerService,
                                        fieldOneSelector,
                                        BasicTypeInfo.INT_TYPE_INFO);
 
-                       timerService.setCurrentTime(0);
+                       testHarness.setProcessingTime(0);
                        testHarness.open();
 
                        // because the window interval is so large, everything 
should be in one window
                        // and aggregate into one value per key
-                       
+
                        for (int i = 0; i < 10; i++) {
                                StreamRecord<Tuple2<Integer, Integer>> next1 = 
new StreamRecord<>(new Tuple2<>(1, i));
                                testHarness.processElement(next1);
@@ -827,7 +701,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next2);
                        }
 
-                       timerService.setCurrentTime(1000);
+                       testHarness.setProcessingTime(1000);
 
                        int count1 = StatefulFunction.globalCounts.get(1);
                        int count2 = StatefulFunction.globalCounts.get(2);
@@ -851,32 +725,30 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
-
                        StatefulFunction.globalCounts.clear();
-                       
+
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
                                                        new StatefulFunction(), 
fieldOneSelector,
                                                        IntSerializer.INSTANCE, 
tupleSerializer, windowSize, windowSlide);
 
-                       timerService.setCurrentTime(0);
                        KeyedOneInputStreamOperatorTestHarness<Integer, 
Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(
                                        op,
                                        new ExecutionConfig(),
-                                       timerService,
                                        fieldOneSelector,
                                        BasicTypeInfo.INT_TYPE_INFO);
 
+                       testHarness.setProcessingTime(0);
+
                        testHarness.open();
 
                        // because the window interval is so large, everything 
should be in one window
                        // and aggregate into one value per key
                        final int numElements = 100;
-                       
+
                        // because we do not release the lock here, these 
elements
                        for (int i = 0; i < numElements; i++) {
-                               
+
                                StreamRecord<Tuple2<Integer, Integer>> next1 = 
new StreamRecord<>(new Tuple2<>(1, i));
                                StreamRecord<Tuple2<Integer, Integer>> next2 = 
new StreamRecord<>(new Tuple2<>(2, i));
                                StreamRecord<Tuple2<Integer, Integer>> next3 = 
new StreamRecord<>(new Tuple2<>(1, i));
@@ -888,14 +760,14 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                testHarness.processElement(next4);
                        }
 
-                       timerService.setCurrentTime(50);
-                       timerService.setCurrentTime(100);
-                       timerService.setCurrentTime(150);
-                       timerService.setCurrentTime(200);
+                       testHarness.setProcessingTime(50);
+                       testHarness.setProcessingTime(100);
+                       testHarness.setProcessingTime(150);
+                       testHarness.setProcessingTime(200);
 
                        int count1 = StatefulFunction.globalCounts.get(1);
                        int count2 = StatefulFunction.globalCounts.get(2);
-                       
+
                        assertTrue(count1 >= 2 && count1 <= 2 * numElements);
                        assertEquals(count1, count2);
 
@@ -907,12 +779,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        fail(e.getMessage());
                }
        }
-       
+
        // 
------------------------------------------------------------------------
-       
+
        private void assertInvalidParameter(long windowSize, long windowSlide) {
                try {
-                       new AggregatingProcessingTimeWindowOperator<String, 
String>(
+                       new AggregatingProcessingTimeWindowOperator<>(
                                        mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE,
                                        windowSize, windowSlide);
@@ -927,11 +799,11 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        // 
------------------------------------------------------------------------
-       
+
        private static class FailingFunction implements 
ReduceFunction<Tuple2<Integer, Integer>> {
 
                private final int failAfterElements;
-               
+
                private int numElements;
 
                FailingFunction(int failAfterElements) {
@@ -945,7 +817,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        if (numElements >= failAfterElements) {
                                throw new Exception("Artificial Test 
Exception");
                        }
-                       
+
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                }
        }
@@ -961,7 +833,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                @Override
                public void open(Configuration parameters) {
                        assertNotNull(getRuntimeContext());
-                       
+
                        // start with one, so the final count is correct and we 
test that we do not
                        // initialize with 0 always by default
                        state = getRuntimeContext().getState(new 
ValueStateDescriptor<>("totalCount", Integer.class, 1));
@@ -971,44 +843,15 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> 
value1, Tuple2<Integer, Integer> value2) throws Exception {
                        state.update(state.value() + 1);
                        globalCounts.put(value1.f0, state.value());
-                       
+
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                }
        }
 
        // 
------------------------------------------------------------------------
-       
-       private static StreamTask<?, ?> createMockTask() {
-               Configuration configuration = new Configuration();
-               configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
-
-               StreamTask<?, ?> task = mock(StreamTask.class);
-               when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
-               when(task.getName()).thenReturn("Test task name");
-               when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
-
-               final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
-               
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
-
-               final Environment env = new DummyEnvironment("Test task name", 
1, 0);
-               when(task.getEnvironment()).thenReturn(env);
-
-               return task;
-       }
-
-       private static StreamTask<?, ?> createMockTaskWithTimer(final 
ProcessingTimeService timerService)
-       {
-               StreamTask<?, ?> mockTask = createMockTask();
-               
when(mockTask.getProcessingTimeService()).thenReturn(timerService);
-               return mockTask;
-       }
-
-       private static StreamConfig createTaskConfig(KeySelector<?, ?> 
partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
-               return new StreamConfig(new Configuration());
-       }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
-       private <T> List<T> extractFromStreamRecords(Iterable<Object> input) {
+       private <T> List<T> extractFromStreamRecords(Iterable<?> input) {
                List<T> result = new ArrayList<>();
                for (Object in : input) {
                        if (in instanceof StreamRecord) {
@@ -1017,12 +860,4 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                }
                return result;
        }
-
-       private static void shutdownTimerServiceAndWait(ProcessingTimeService 
timers) throws Exception {
-               timers.shutdownService();
-
-               while (!timers.isTerminated()) {
-                       Thread.sleep(2);
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
deleted file mode 100644
index 42be131..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-       
-       private final List<T> elements = new ArrayList<>();
-
-       private final int timeStampModulus;
-
-
-       public CollectingOutput() {
-               this.timeStampModulus = 0;
-       }
-       
-       public CollectingOutput(int timeStampModulus) {
-               this.timeStampModulus = timeStampModulus;
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       public List<T> getElements() {
-               return elements;
-       }
-       
-       public void waitForNElements(int n, long timeout) throws 
InterruptedException {
-               long deadline = System.currentTimeMillis() + timeout;
-               synchronized (elements) {
-                       long now;
-                       while (elements.size() < n && (now = 
System.currentTimeMillis()) < deadline) {
-                               elements.wait(deadline - now);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       
-       @Override
-       public void emitWatermark(Watermark mark) {
-               throw new UnsupportedOperationException("The output should not 
emit watermarks");
-       }
-
-       @Override
-       public void emitLatencyMarker(LatencyMarker latencyMarker) {
-               throw new UnsupportedOperationException("The output should not 
emit latency markers");
-       }
-
-       @Override
-       public void collect(StreamRecord<T> record) {
-               elements.add(record.getValue());
-               
-               if (timeStampModulus != 0 && record.getTimestamp() % 
timeStampModulus != 0) {
-                       throw new IllegalArgumentException("Invalid timestamp");
-               }
-               synchronized (elements) {
-                       elements.notifyAll();
-               }
-       }
-
-       @Override
-       public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/30554758/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
deleted file mode 100644
index a7a71cf..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
-import java.util.concurrent.ScheduledFuture;
-
-class NoOpTimerService extends ProcessingTimeService {
-
-       private volatile boolean terminated;
-
-       @Override
-       public long getCurrentProcessingTime() {
-               return System.currentTimeMillis();
-       }
-
-       @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
-               return null;
-       }
-
-       @Override
-       public boolean isTerminated() {
-               return terminated;
-       }
-
-       @Override
-       public void quiesceAndAwaitPending() {}
-
-       @Override
-       public void shutdownService() {
-               terminated = true;
-       }
-}

Reply via email to