http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
deleted file mode 100644
index a8f2dc4..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestTimeProviderTest.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-public class TestTimeProviderTest {
-
-       @Test
-       public void testCustomTimeServiceProvider() throws Throwable {
-               TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
-               mapTask.setTimeService(tp);
-
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
-                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               StreamConfig streamConfig = testHarness.getStreamConfig();
-
-               StreamMap<String, String> mapOperator = new StreamMap<>(new 
StreamTaskTimerTest.DummyMapFunction<String>());
-               streamConfig.setStreamOperator(mapOperator);
-
-               testHarness.invoke();
-
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 0);
-
-               tp.setCurrentTime(11);
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 11);
-
-               tp.setCurrentTime(15);
-               tp.setCurrentTime(16);
-               
assertEquals(testHarness.getTimerService().getCurrentProcessingTime(), 16);
-
-               // register 2 tasks
-               mapTask.getTimerService().registerTimer(30, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               mapTask.getTimerService().registerTimer(40, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               assertEquals(2, tp.getNumRegisteredTimers());
-
-               tp.setCurrentTime(35);
-               assertEquals(1, tp.getNumRegisteredTimers());
-
-               tp.setCurrentTime(40);
-               assertEquals(0, tp.getNumRegisteredTimers());
-
-               tp.shutdownService();
-       }
-
-       // 
------------------------------------------------------------------------
-
-       public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
-
-               private final AtomicReference<Throwable> errorReference;
-
-               public 
ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
-                       this.errorReference = errorReference;
-               }
-
-               @Override
-               public void handleAsyncException(String message, Throwable 
exception) {
-                       errorReference.compareAndSet(null, exception);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index 4d5c881..af99d0d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
 import org.junit.Test;
@@ -41,9 +42,13 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 
                final ExecutionConfig config = new ExecutionConfig();
                config.setAutoWatermarkInterval(50);
+
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
                
                OneInputStreamOperatorTestHarness<Long, Long> testHarness =
-                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config);
+                               new OneInputStreamOperatorTestHarness<Long, 
Long>(operator, config, processingTimeService);
+
+               long currentTime = 0;
 
                testHarness.open();
                
@@ -71,7 +76,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                                        // check the invariant
                                        assertTrue(lastWatermark < 
nextElementValue);
                                } else {
-                                       Thread.sleep(10);
+                                       currentTime = currentTime + 10;
+                                       
processingTimeService.setCurrentTime(currentTime);
                                }
                        }
                        
@@ -102,7 +108,8 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
                                        // check the invariant
                                        assertTrue(lastWatermark < 
nextElementValue);
                                } else {
-                                       Thread.sleep(10);
+                                       currentTime = currentTime + 10;
+                                       
processingTimeService.setCurrentTime(currentTime);
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index e96109e..128c88b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,12 +39,12 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
@@ -186,8 +186,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final TimeServiceProvider timerService = new 
NoOpTimerService();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, new Object());
+                       final ProcessingTimeService timerService = new 
NoOpTimerService();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AccumulatingProcessingTimeWindowOperator<String, 
String, String> op;
 
@@ -234,13 +234,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(windowSize);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -299,12 +299,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
                        
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -371,12 +371,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -438,12 +438,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
                
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                        new ReferenceSettingExceptionHandler(error), lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -503,7 +503,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
@@ -542,7 +542,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       timerService = new TestTimeServiceProvider();
+                       timerService = new TestProcessingTimeService();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
@@ -583,7 +583,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        // sliding window (200 msecs) every 50 msecs
                        AccumulatingProcessingTimeWindowOperator<Integer, 
Integer, Integer> op =
@@ -631,7 +631,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                        windowSize, windowSlide);
 
-                       timerService = new TestTimeServiceProvider();
+                       timerService = new TestProcessingTimeService();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
@@ -684,7 +684,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        new StatefulFunction(), 
identitySelector,
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, 50, 50);
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        OneInputStreamOperatorTestHarness<Integer, Integer> 
testHarness =
                                        new 
KeyedOneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), 
timerService, identitySelector, BasicTypeInfo.INT_TYPE_INFO);
@@ -777,7 +777,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
 
-       private static StreamTask<?, ?> createMockTask(Object lock) {
+       private static StreamTask<?, ?> createMockTask() {
                Configuration configuration = new Configuration();
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
 
@@ -785,7 +785,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
                when(task.getName()).thenReturn("Test task name");
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
-               when(task.getCheckpointLock()).thenReturn(lock);
 
                final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
                
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -801,10 +800,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        private static StreamTask<?, ?> createMockTaskWithTimer(
-               final TimeServiceProvider timerService, final Object lock)
+               final ProcessingTimeService timerService)
        {
-               StreamTask<?, ?> mockTask = createMockTask(lock);
-               when(mockTask.getTimerService()).thenReturn(timerService);
+               StreamTask<?, ?> mockTask = createMockTask();
+               
when(mockTask.getProcessingTimeService()).thenReturn(timerService);
                return mockTask;
        }
 
@@ -819,7 +818,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                return result;
        }
 
-       private static void shutdownTimerServiceAndWait(TimeServiceProvider 
timers) throws Exception {
+       private static void shutdownTimerServiceAndWait(ProcessingTimeService 
timers) throws Exception {
                timers.shutdownService();
 
                while (!timers.isTerminated()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 802329b..bb64a08 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,12 +40,13 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
-import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
@@ -194,8 +195,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       final TimeServiceProvider timerService = new 
NoOpTimerService();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, new Object());
+                       final ProcessingTimeService timerService = new 
NoOpTimerService();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AggregatingProcessingTimeWindowOperator<String, String> 
op;
 
@@ -242,7 +243,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
@@ -255,7 +256,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
tupleSerializer,
                                                        windowSize, windowSize);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        op.setup(mockTask, createTaskConfig(fieldOneSelector, 
IntSerializer.INSTANCE, 10), out);
                        op.open();
@@ -311,13 +312,13 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
                                        new 
AggregatingProcessingTimeWindowOperator<>(
@@ -389,13 +390,13 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
 
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -471,12 +472,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        // tumbling window that triggers every 20 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -541,12 +542,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                final Object lock = new Object();
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
 
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
-                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService);
 
                        ReduceFunction<Tuple2<Integer, Integer>> 
failingFunction = new FailingFunction(100);
 
@@ -605,7 +606,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        final int windowSize = 200;
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        // tumbling window that triggers every 50 milliseconds
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -655,7 +656,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSize);
 
-                       timerService = new TestTimeServiceProvider();
+                       timerService = new TestProcessingTimeService();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
@@ -698,7 +699,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        // sliding window (200 msecs) every 50 msecs
                        AggregatingProcessingTimeWindowOperator<Integer, 
Tuple2<Integer, Integer>> op =
@@ -748,7 +749,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSlide);
 
-                       timerService = new TestTimeServiceProvider();
+                       timerService = new TestProcessingTimeService();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
@@ -796,7 +797,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                try {
                        final long twoSeconds = 2000;
                        
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        StatefulFunction.globalCounts.clear();
                        
@@ -850,7 +851,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        final int windowSlide = 50;
                        final int windowSize = factor * windowSlide;
 
-                       TestTimeServiceProvider timerService = new 
TestTimeServiceProvider();
+                       TestProcessingTimeService timerService = new 
TestProcessingTimeService();
 
                        StatefulFunction.globalCounts.clear();
                        
@@ -977,7 +978,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        // 
------------------------------------------------------------------------
        
-       private static StreamTask<?, ?> createMockTask(Object lock) {
+       private static StreamTask<?, ?> createMockTask() {
                Configuration configuration = new Configuration();
                configuration.setString(ConfigConstants.STATE_BACKEND, 
"jobmanager");
 
@@ -985,7 +986,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
                when(task.getName()).thenReturn("Test task name");
                when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
-               when(task.getCheckpointLock()).thenReturn(lock);
 
                final TaskManagerRuntimeInfo mockTaskManagerRuntimeInfo = 
mock(TaskManagerRuntimeInfo.class);
                
when(mockTaskManagerRuntimeInfo.getConfiguration()).thenReturn(configuration);
@@ -996,10 +996,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                return task;
        }
 
-       private static StreamTask<?, ?> createMockTaskWithTimer(final 
TimeServiceProvider timerService, final Object lock)
+       private static StreamTask<?, ?> createMockTaskWithTimer(final 
ProcessingTimeService timerService)
        {
-               StreamTask<?, ?> mockTask = createMockTask(lock);
-               when(mockTask.getTimerService()).thenReturn(timerService);
+               StreamTask<?, ?> mockTask = createMockTask();
+               
when(mockTask.getProcessingTimeService()).thenReturn(timerService);
                return mockTask;
        }
 
@@ -1018,7 +1018,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                return result;
        }
 
-       private static void shutdownTimerServiceAndWait(TimeServiceProvider 
timers) throws Exception {
+       private static void shutdownTimerServiceAndWait(ProcessingTimeService 
timers) throws Exception {
                timers.shutdownService();
 
                while (!timers.isTerminated()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
index d0c5050..a7a71cf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -19,11 +19,11 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.util.concurrent.ScheduledFuture;
 
-class NoOpTimerService extends TimeServiceProvider {
+class NoOpTimerService extends ProcessingTimeService {
 
        private volatile boolean terminated;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 2b0b915..38f0778 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -63,7 +63,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
@@ -850,8 +850,6 @@ public class WindowOperatorTest extends TestLogger {
 
                TypeInformation<Tuple2<String, Integer>> inputType = 
TypeInfoParser.parse("Tuple2<String, Integer>");
 
-               TestTimeServiceProvider timer = new TestTimeServiceProvider();
-
                ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc = 
new ReducingStateDescriptor<>("window-contents",
                                new SumReducer(),
                                inputType.createSerializer(new 
ExecutionConfig()));
@@ -869,7 +867,7 @@ public class WindowOperatorTest extends TestLogger {
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               operator, new 
ExecutionConfig(), timer,
+                                               operator, new ExecutionConfig(),
                                                new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
                
                testHarness.open();
@@ -898,7 +896,7 @@ public class WindowOperatorTest extends TestLogger {
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> otherTestHarness =
                                new KeyedOneInputStreamOperatorTestHarness<>(
-                                               otherOperator, new 
ExecutionConfig(), timer,
+                                               otherOperator, new 
ExecutionConfig(),
                                                new TupleKeySelector(), 
BasicTypeInfo.STRING_TYPE_INFO);
 
                otherTestHarness.setup();
@@ -928,7 +926,7 @@ public class WindowOperatorTest extends TestLogger {
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                ProcessingTimeTrigger.create(), 0);
 
-               TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
+               TestProcessingTimeService testTimeProvider = new 
TestProcessingTimeService();
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -987,7 +985,7 @@ public class WindowOperatorTest extends TestLogger {
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                ProcessingTimeTrigger.create(), 0);
 
-               TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
+               TestProcessingTimeService testTimeProvider = new 
TestProcessingTimeService();
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1059,7 +1057,7 @@ public class WindowOperatorTest extends TestLogger {
                                new InternalSingleValueWindowFunction<>(new 
PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
                                ProcessingTimeTrigger.create(), 0);
 
-               TestTimeServiceProvider testTimeProvider = new 
TestTimeServiceProvider();
+               TestProcessingTimeService testTimeProvider = new 
TestProcessingTimeService();
 
                OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, 
Tuple2<String, Integer>> testHarness =
                                new 
KeyedOneInputStreamOperatorTestHarness<>(operator, new ExecutionConfig(), 
testTimeProvider, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
deleted file mode 100644
index 29e13ed..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProviderTest.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.core.testutils.OneShotLatch;
-import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import org.junit.Test;
-
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class DefaultTimeServiceProviderTest {
-
-       @Test
-       public void testTriggerHoldsLock() throws Exception {
-
-               final Object lock = new Object();
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
-
-               try {
-                       assertEquals(0, timer.getNumTasksScheduled());
-
-                       // schedule something
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) {
-                                       assertTrue(Thread.holdsLock(lock));
-                               }
-                       });
-
-                       // wait until the execution is over
-                       future.get();
-                       assertEquals(0, timer.getNumTasksScheduled());
-
-                       // check that no asynchronous error was reported
-                       if (errorRef.get() != null) {
-                               throw new Exception(errorRef.get());
-                       }
-               }
-               finally {
-                       timer.shutdownService();
-               }
-       }
-
-       @Test
-       public void testImmediateShutdown() throws Exception {
-
-               final Object lock = new Object();
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
-
-               try {
-                       assertFalse(timer.isTerminated());
-
-                       final OneShotLatch latch = new OneShotLatch();
-
-                       // the task should trigger immediately and should block 
until terminated with interruption
-                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       latch.trigger();
-                                       Thread.sleep(100000000);
-                               }
-                       });
-
-                       latch.await();
-                       timer.shutdownService();
-
-                       // can only enter this scope after the triggerable is 
interrupted
-                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
-                       synchronized (lock) {
-                               assertTrue(timer.isTerminated());
-                       }
-
-                       try {
-                               timer.registerTimer(System.currentTimeMillis() 
+ 1000, new Triggerable() {
-                                       @Override
-                                       public void trigger(long timestamp) {}
-                               });
-
-                               fail("should result in an exception");
-                       }
-                       catch (IllegalStateException e) {
-                               // expected
-                       }
-
-                       // obviously, we have an asynchronous interrupted 
exception
-                       assertNotNull(errorRef.get());
-                       assertTrue(errorRef.get().getCause() instanceof 
InterruptedException);
-
-                       assertEquals(0, timer.getNumTasksScheduled());
-               }
-               finally {
-                       timer.shutdownService();
-               }
-       }
-
-       @Test
-       public void testQuiescing() throws Exception {
-
-               final Object lock = new Object();
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
-
-               try {
-                       final OneShotLatch latch = new OneShotLatch();
-
-                       final ReentrantLock scopeLock = new ReentrantLock();
-
-                       timer.registerTimer(System.currentTimeMillis() + 20, 
new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       scopeLock.lock();
-                                       try {
-                                               latch.trigger();
-                                               // delay a bit before leaving 
the method
-                                               Thread.sleep(5);
-                                       } finally {
-                                               scopeLock.unlock();
-                                       }
-                               }
-                       });
-
-                       // after the task triggered, shut the timer down 
cleanly, waiting for the task to finish
-                       latch.await();
-                       timer.quiesceAndAwaitPending();
-
-                       // should be able to immediately acquire the lock, 
since the task must have exited by now 
-                       assertTrue(scopeLock.tryLock());
-
-                       // should be able to schedule more tasks (that never 
get executed)
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       throw new Exception("test");
-                               }
-                       });
-                       assertNotNull(future);
-
-                       // nothing should be scheduled right now
-                       assertEquals(0, timer.getNumTasksScheduled());
-
-                       // check that no asynchronous error was reported - that 
ensures that the newly scheduled 
-                       // triggerable did, in fact, not trigger
-                       if (errorRef.get() != null) {
-                               throw new Exception(errorRef.get());
-                       }
-               }
-               finally {
-                       timer.shutdownService();
-               }
-       }
-
-       @Test
-       public void testFutureCancellation() throws Exception {
-
-               final Object lock = new Object();
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
-
-               try {
-                       assertEquals(0, timer.getNumTasksScheduled());
-
-                       // schedule something
-                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) {}
-                       });
-                       assertEquals(1, timer.getNumTasksScheduled());
-
-                       future.cancel(false);
-
-                       assertEquals(0, timer.getNumTasksScheduled());
-
-                       // check that no asynchronous error was reported
-                       if (errorRef.get() != null) {
-                               throw new Exception(errorRef.get());
-                       }
-               }
-               finally {
-                       timer.shutdownService();
-               }
-       }
-
-       @Test
-       public void testExceptionReporting() throws InterruptedException {
-               final AtomicBoolean exceptionWasThrown = new 
AtomicBoolean(false);
-               final OneShotLatch latch = new OneShotLatch();
-               final Object lock = new Object();
-
-               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
-                               new AsyncExceptionHandler() {
-                                       @Override
-                                       public void handleAsyncException(String 
message, Throwable exception) {
-                                               exceptionWasThrown.set(true);
-                                               latch.trigger();
-                                       }
-                               }, lock);
-               
-               timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) throws Exception {
-                               throw new Exception("Exception in Timer");
-                       }
-               });
-
-               latch.await();
-               assertTrue(exceptionWasThrown.get());
-       }
-
-       @Test
-       public void testTimerSorting() throws Exception {
-               final Object lock = new Object();
-               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-               final DefaultTimeServiceProvider timer = new 
DefaultTimeServiceProvider(
-                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
-
-               try {
-                       final OneShotLatch sync = new OneShotLatch();
-
-                       // we block the timer execution to make sure we have 
all the time
-                       // to register some additional timers out of order
-                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) throws 
Exception {
-                                       sync.await();
-                               }
-                       });
-                       
-                       // schedule two timers out of order something
-                       final long now = System.currentTimeMillis();
-                       final long time1 = now + 6;
-                       final long time2 = now + 5;
-                       final long time3 = now + 8;
-                       final long time4 = now - 2;
-
-                       final ArrayBlockingQueue<Long> timestamps = new 
ArrayBlockingQueue<>(4);
-                       Triggerable trigger = new Triggerable() {
-                               @Override
-                               public void trigger(long timestamp) {
-                                       timestamps.add(timestamp);
-                               }
-                       };
-
-                       // schedule
-                       ScheduledFuture<?> future1 = timer.registerTimer(time1, 
trigger);
-                       ScheduledFuture<?> future2 = timer.registerTimer(time2, 
trigger);
-                       ScheduledFuture<?> future3 = timer.registerTimer(time3, 
trigger);
-                       ScheduledFuture<?> future4 = timer.registerTimer(time4, 
trigger);
-
-                       // now that everything is scheduled, unblock the timer 
service
-                       sync.trigger();
-
-                       // wait until both are complete
-                       future1.get();
-                       future2.get();
-                       future3.get();
-                       future4.get();
-
-                       // verify that the order is 4 - 2 - 1 - 3
-                       assertEquals(4, timestamps.size());
-                       assertEquals(time4, timestamps.take().longValue());
-                       assertEquals(time2, timestamps.take().longValue());
-                       assertEquals(time1, timestamps.take().longValue());
-                       assertEquals(time3, timestamps.take().longValue());
-
-                       // check that no asynchronous error was reported
-                       if (errorRef.get() != null) {
-                               throw new Exception(errorRef.get());
-                       }
-               }
-               finally {
-                       timer.shutdownService();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index ce62624..ab7bf69 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -113,11 +113,11 @@ public class StreamTaskTestHarness<OUT> {
                outputStreamRecordSerializer = new 
StreamElementSerializer<OUT>(outputSerializer);
        }
 
-       public TimeServiceProvider getTimerService() {
+       public ProcessingTimeService getProcessingTimeService() {
                if (!(task instanceof StreamTask)) {
-                       throw new 
UnsupportedOperationException("getTimerService() only supported on 
StreamTasks.");
+                       throw new 
UnsupportedOperationException("getProcessingTimeService() only supported on 
StreamTasks.");
                }
-               return ((StreamTask) task).getTimerService();
+               return ((StreamTask) task).getProcessingTimeService();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
new file mode 100644
index 0000000..e7944df
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import org.junit.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SystemProcessingTimeServiceTest {
+
+       @Test
+       public void testTriggerHoldsLock() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final SystemProcessingTimeService timer = new 
SystemProcessingTimeService(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // schedule something
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {
+                                       assertTrue(Thread.holdsLock(lock));
+                               }
+                       });
+
+                       // wait until the execution is over
+                       future.get();
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testImmediateShutdown() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final SystemProcessingTimeService timer = new 
SystemProcessingTimeService(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertFalse(timer.isTerminated());
+
+                       final OneShotLatch latch = new OneShotLatch();
+
+                       // the task should trigger immediately and should block 
until terminated with interruption
+                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       latch.trigger();
+                                       Thread.sleep(100000000);
+                               }
+                       });
+
+                       latch.await();
+                       timer.shutdownService();
+
+                       // can only enter this scope after the triggerable is 
interrupted
+                       //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                       synchronized (lock) {
+                               assertTrue(timer.isTerminated());
+                       }
+
+                       try {
+                               timer.registerTimer(System.currentTimeMillis() 
+ 1000, new Triggerable() {
+                                       @Override
+                                       public void trigger(long timestamp) {}
+                               });
+
+                               fail("should result in an exception");
+                       }
+                       catch (IllegalStateException e) {
+                               // expected
+                       }
+
+                       // obviously, we have an asynchronous interrupted 
exception
+                       assertNotNull(errorRef.get());
+                       assertTrue(errorRef.get().getCause() instanceof 
InterruptedException);
+
+                       assertEquals(0, timer.getNumTasksScheduled());
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testQuiescing() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final SystemProcessingTimeService timer = new 
SystemProcessingTimeService(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       final OneShotLatch latch = new OneShotLatch();
+
+                       final ReentrantLock scopeLock = new ReentrantLock();
+
+                       timer.registerTimer(System.currentTimeMillis() + 20, 
new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       scopeLock.lock();
+                                       try {
+                                               latch.trigger();
+                                               // delay a bit before leaving 
the method
+                                               Thread.sleep(5);
+                                       } finally {
+                                               scopeLock.unlock();
+                                       }
+                               }
+                       });
+
+                       // after the task triggered, shut the timer down 
cleanly, waiting for the task to finish
+                       latch.await();
+                       timer.quiesceAndAwaitPending();
+
+                       // should be able to immediately acquire the lock, 
since the task must have exited by now 
+                       assertTrue(scopeLock.tryLock());
+
+                       // should be able to schedule more tasks (that never 
get executed)
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() - 5, new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       throw new Exception("test");
+                               }
+                       });
+                       assertNotNull(future);
+
+                       // nothing should be scheduled right now
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported - that 
ensures that the newly scheduled 
+                       // triggerable did, in fact, not trigger
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testFutureCancellation() throws Exception {
+
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final SystemProcessingTimeService timer = new 
SystemProcessingTimeService(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // schedule something
+                       ScheduledFuture<?> future = 
timer.registerTimer(System.currentTimeMillis() + 100000000, new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {}
+                       });
+                       assertEquals(1, timer.getNumTasksScheduled());
+
+                       future.cancel(false);
+
+                       assertEquals(0, timer.getNumTasksScheduled());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+
+       @Test
+       public void testExceptionReporting() throws InterruptedException {
+               final AtomicBoolean exceptionWasThrown = new 
AtomicBoolean(false);
+               final OneShotLatch latch = new OneShotLatch();
+               final Object lock = new Object();
+
+               ProcessingTimeService timeServiceProvider = new 
SystemProcessingTimeService(
+                               new AsyncExceptionHandler() {
+                                       @Override
+                                       public void handleAsyncException(String 
message, Throwable exception) {
+                                               exceptionWasThrown.set(true);
+                                               latch.trigger();
+                                       }
+                               }, lock);
+               
+               timeServiceProvider.registerTimer(System.currentTimeMillis(), 
new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) throws Exception {
+                               throw new Exception("Exception in Timer");
+                       }
+               });
+
+               latch.await();
+               assertTrue(exceptionWasThrown.get());
+       }
+
+       @Test
+       public void testTimerSorting() throws Exception {
+               final Object lock = new Object();
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+
+               final SystemProcessingTimeService timer = new 
SystemProcessingTimeService(
+                               new ReferenceSettingExceptionHandler(errorRef), 
lock);
+
+               try {
+                       final OneShotLatch sync = new OneShotLatch();
+
+                       // we block the timer execution to make sure we have 
all the time
+                       // to register some additional timers out of order
+                       timer.registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       sync.await();
+                               }
+                       });
+                       
+                       // schedule two timers out of order something
+                       final long now = System.currentTimeMillis();
+                       final long time1 = now + 6;
+                       final long time2 = now + 5;
+                       final long time3 = now + 8;
+                       final long time4 = now - 2;
+
+                       final ArrayBlockingQueue<Long> timestamps = new 
ArrayBlockingQueue<>(4);
+                       Triggerable trigger = new Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) {
+                                       timestamps.add(timestamp);
+                               }
+                       };
+
+                       // schedule
+                       ScheduledFuture<?> future1 = timer.registerTimer(time1, 
trigger);
+                       ScheduledFuture<?> future2 = timer.registerTimer(time2, 
trigger);
+                       ScheduledFuture<?> future3 = timer.registerTimer(time3, 
trigger);
+                       ScheduledFuture<?> future4 = timer.registerTimer(time4, 
trigger);
+
+                       // now that everything is scheduled, unblock the timer 
service
+                       sync.trigger();
+
+                       // wait until both are complete
+                       future1.get();
+                       future2.get();
+                       future3.get();
+                       future4.get();
+
+                       // verify that the order is 4 - 2 - 1 - 3
+                       assertEquals(4, timestamps.size());
+                       assertEquals(time4, timestamps.take().longValue());
+                       assertEquals(time2, timestamps.take().longValue());
+                       assertEquals(time1, timestamps.take().longValue());
+                       assertEquals(time3, timestamps.take().longValue());
+
+                       // check that no asynchronous error was reported
+                       if (errorRef.get() != null) {
+                               throw new Exception(errorRef.get());
+                       }
+               }
+               finally {
+                       timer.shutdownService();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 41968e6..6ad684b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -32,9 +32,9 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -50,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
 
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the 
operator to get
- * a {@link AbstractKeyedStateBackend}.
+ * a {@link KeyedStateBackend}.
  *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
@@ -94,7 +94,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
 
        public 
KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
-                       TestTimeServiceProvider testTimeProvider,
+                       ProcessingTimeService testTimeProvider,
                        KeySelector<IN, K> keySelector,
                        TypeInformation<K> keyType) {
                super(operator, executionConfig, testTimeProvider);
@@ -187,7 +187,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
        }
 
        /**
-        *
+        * 
         */
        @Override
        public void restore(StreamStateHandle snapshot) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 2dd2163..c2763d8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -19,24 +19,14 @@ package org.apache.flink.streaming.util;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class MockContext<IN, OUT> {
        
@@ -95,17 +85,4 @@ public class MockContext<IN, OUT> {
 
                return result;
        }
-
-       private static StreamTask<?, ?> createMockTaskWithTimer(
-               final TimeServiceProvider timerService, final Object lock)
-       {
-               StreamTask<?, ?> task = mock(StreamTask.class);
-               when(task.getAccumulatorMap()).thenReturn(new HashMap<String, 
Accumulator<?, ?>>());
-               when(task.getName()).thenReturn("Test task name");
-               when(task.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
-               when(task.getEnvironment()).thenReturn(new 
MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 
1024));
-               when(task.getCheckpointLock()).thenReturn(lock);
-               when(task.getTimerService()).thenReturn(timerService);
-               return task;
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9f8d223..4104049 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -43,11 +43,11 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -79,9 +79,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
        final ExecutionConfig executionConfig;
 
-       final Object checkpointLock;
-
-       final TimeServiceProvider timeServiceProvider;
+       final ProcessingTimeService processingTimeService;
 
        StreamTask<?, ?> mockTask;
 
@@ -105,36 +103,36 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig) {
-               this(operator, executionConfig, null);
+               this(operator, executionConfig, new 
TestProcessingTimeService());
        }
 
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
-                       TestTimeServiceProvider testTimeProvider) {
-               this(operator, executionConfig, new Object(), testTimeProvider);
+                       ProcessingTimeService processingTimeService) {
+               this(operator, executionConfig, new Object(), 
processingTimeService);
        }
 
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
                        Object checkpointLock,
-                       TimeServiceProvider testTimeProvider) {
+                       ProcessingTimeService processingTimeService) {
 
+               this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
                this.operator = operator;
-               this.outputList = new ConcurrentLinkedQueue<Object>();
+               this.outputList = new ConcurrentLinkedQueue<>();
                Configuration underlyingConfig = new Configuration();
                this.config = new StreamConfig(underlyingConfig);
                this.config.setCheckpointingEnabled(true);
                this.executionConfig = executionConfig;
-               this.checkpointLock = checkpointLock;
                this.closableRegistry = new ClosableRegistry();
 
                final Environment env = new MockEnvironment("MockTwoInputTask", 
3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, 
executionConfig, MAX_PARALLELISM, 1, 0);
                mockTask = mock(StreamTask.class);
 
                when(mockTask.getName()).thenReturn("Mock Task");
-               
when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
+               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
                when(mockTask.getEnvironment()).thenReturn(env);
@@ -183,15 +181,12 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                        throw new RuntimeException(e.getMessage(), e);
                }
 
-               timeServiceProvider = testTimeProvider != null ? 
testTimeProvider :
-                       new DefaultTimeServiceProvider(mockTask, 
this.checkpointLock);
-
-               doAnswer(new Answer<TimeServiceProvider>() {
+               doAnswer(new Answer<ProcessingTimeService>() {
                        @Override
-                       public TimeServiceProvider answer(InvocationOnMock 
invocation) throws Throwable {
-                               return timeServiceProvider;
+                       public ProcessingTimeService answer(InvocationOnMock 
invocation) throws Throwable {
+                               return 
OneInputStreamOperatorTestHarness.this.processingTimeService;
                        }
-               }).when(mockTask).getTimerService();
+               }).when(mockTask).getProcessingTimeService();
        }
 
        public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
@@ -219,9 +214,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        }
 
        /**
-        * Get all the output from the task. This contains StreamRecords and 
Events interleaved. Use
-        * {@link 
org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-        * to extract only the StreamRecords.
+        * Get all the output from the task. This contains StreamRecords and 
Events interleaved.
         */
        public ConcurrentLinkedQueue<Object> getOutput() {
                return outputList;
@@ -316,8 +309,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        public void close() throws Exception {
                operator.close();
                operator.dispose();
-               if (timeServiceProvider != null) {
-                       timeServiceProvider.shutdownService();
+               if (processingTimeService != null) {
+                       processingTimeService.shutdownService();
                }
                setupCalled = false;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index a4e26f0..ed9a7cd 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
@@ -50,7 +50,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class WindowingTestHarness<K, IN, W extends Window> {
 
-       private final TestTimeServiceProvider timeServiceProvider;
+       private final TestProcessingTimeService timeServiceProvider;
 
        private final OneInputStreamOperatorTestHarness<IN, IN> testHarness;
 
@@ -80,7 +80,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
                                trigger,
                                allowedLateness);
 
-               timeServiceProvider = new TestTimeServiceProvider();
+               timeServiceProvider = new TestProcessingTimeService();
                testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, executionConfig, 
timeServiceProvider, keySelector, keyType);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 707ce0f..e7f62fd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -192,7 +192,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+                               
getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, 
this);
                                first = false;
                        }
                        numElements++;
@@ -209,7 +209,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        try {
                                numTimers++;
                                throwIfDone();
-                               
getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+                               
getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
                        } finally {
                                semaphore.release();
                        }
@@ -251,7 +251,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+                               
getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, 
this);
                                first = false;
                        }
                        numElements++;
@@ -266,7 +266,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        }
 
                        if (first) {
-                               
getTimerService().registerTimer(System.currentTimeMillis() + 100, this);
+                               
getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100, 
this);
                                first = false;
                        }
                        numElements++;
@@ -284,7 +284,7 @@ public class StreamTaskTimerITCase extends 
StreamingMultipleProgramsTestBase {
                        try {
                                numTimers++;
                                throwIfDone();
-                               
getTimerService().registerTimer(System.currentTimeMillis() + 1, this);
+                               
getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1, this);
                        } finally {
                                semaphore.release();
                        }

Reply via email to