[FLINK-4877] Rename TimeServiceProvider to ProcessingTimeService

The name is clashing with the soon-to-be-added
TimerService/InternalTimerService which is meant as an interface for
dealing with both processing time and event time.

TimeServiceProvider is renamed to ProcessingTimeService to reflect the
fact that it is a low-level utility that only deals with "physical"
processing-time trigger tasks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e112a632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e112a632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e112a632

Branch: refs/heads/master
Commit: e112a63208006b4e348d75f3df84d2fd4b091797
Parents: 71d2e3e
Author: Aljoscha Krettek <[email protected]>
Authored: Sun Sep 25 20:58:16 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../hdfstests/ContinuousFileMonitoringTest.java |   4 +-
 .../connectors/fs/bucketing/BucketingSink.java  |   6 +-
 .../kafka/internals/AbstractFetcher.java        |   8 +-
 .../AbstractFetcherTimestampsTest.java          |   8 +-
 .../kafka/testutils/MockRuntimeContext.java     |   8 +-
 .../source/ContinuousFileReaderOperator.java    |   2 +-
 .../api/operators/AbstractStreamOperator.java   |  17 +-
 .../streaming/api/operators/StreamSource.java   |   2 +-
 .../api/operators/StreamSourceContexts.java     |  14 +-
 .../api/operators/StreamingRuntimeContext.java  |   6 +-
 .../operators/ExtractTimestampsOperator.java    |   8 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |   8 +-
 ...ractAlignedProcessingTimeWindowOperator.java |   6 +-
 .../windowing/EvictingWindowOperator.java       |   2 +-
 .../operators/windowing/WindowOperator.java     |  10 +-
 .../tasks/DefaultTimeServiceProvider.java       | 262 ----------------
 .../runtime/tasks/ProcessingTimeService.java    |  83 +++++
 .../streaming/runtime/tasks/StreamTask.java     |  16 +-
 .../tasks/SystemProcessingTimeService.java      | 262 ++++++++++++++++
 .../tasks/TestProcessingTimeService.java        | 172 ++++++++++
 .../runtime/tasks/TestTimeServiceProvider.java  | 172 ----------
 .../runtime/tasks/TimeServiceProvider.java      |  83 -----
 .../operators/StreamSourceOperatorTest.java     |  47 +--
 .../runtime/operators/StreamTaskTimerTest.java  |   6 +-
 .../TestProcessingTimeServiceTest.java          | 113 +++++++
 .../runtime/operators/TestTimeProviderTest.java | 113 -------
 ...stampsAndPeriodicWatermarksOperatorTest.java |  13 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  49 ++-
 ...AlignedProcessingTimeWindowOperatorTest.java |  56 ++--
 .../operators/windowing/NoOpTimerService.java   |   4 +-
 .../operators/windowing/WindowOperatorTest.java |  14 +-
 .../tasks/DefaultTimeServiceProviderTest.java   | 313 -------------------
 .../runtime/tasks/StreamTaskTestHarness.java    |   6 +-
 .../tasks/SystemProcessingTimeServiceTest.java  | 313 +++++++++++++++++++
 .../KeyedOneInputStreamOperatorTestHarness.java |   8 +-
 .../flink/streaming/util/MockContext.java       |  23 --
 .../util/OneInputStreamOperatorTestHarness.java |  43 ++-
 .../streaming/util/WindowingTestHarness.java    |   6 +-
 .../runtime/StreamTaskTimerITCase.java          |  10 +-
 39 files changed, 1141 insertions(+), 1155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
----------------------------------------------------------------------
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
index 36b5c5e..971d5f8 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
@@ -34,7 +34,7 @@ import 
org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileUtil;
@@ -127,7 +127,7 @@ public class ContinuousFileMonitoringTest {
                ContinuousFileReaderOperator<String, ?> reader = new 
ContinuousFileReaderOperator<>(format);
                reader.setOutputType(typeInfo, executionConfig);
 
-               final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+               final TestProcessingTimeService timeServiceProvider = new 
TestProcessingTimeService();
                final OneInputStreamOperatorTestHarness<FileInputSplit, String> 
tester =
                        new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
                tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 5a5cade..6f8a739 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
@@ -285,7 +285,7 @@ public class BucketingSink<T>
 
        private transient Clock clock;
 
-       private transient TimeServiceProvider processingTimeService;
+       private transient ProcessingTimeService processingTimeService;
 
        /**
         * Creates a new {@code BucketingSink} that writes files to the given 
base directory.
@@ -324,7 +324,7 @@ public class BucketingSink<T>
                refTruncate = reflectTruncate(fs);
 
                processingTimeService =
-                               ((StreamingRuntimeContext) 
getRuntimeContext()).getTimeServiceProvider();
+                               ((StreamingRuntimeContext) 
getRuntimeContext()).getProcessingTimeService();
 
                long currentProcessingTime = 
processingTimeService.getCurrentProcessingTime();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index eb01b78..065b54f 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -118,7 +118,7 @@ public abstract class AbstractFetcher<T, KPH> {
                                        
(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
                        
                        PeriodicWatermarkEmitter periodicEmitter = 
-                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext.getTimeServiceProvider(), 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
+                                       new PeriodicWatermarkEmitter(parts, 
sourceContext, runtimeContext.getProcessingTimeService(), 
runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
                        periodicEmitter.start();
                }
        }
@@ -466,7 +466,7 @@ public abstract class AbstractFetcher<T, KPH> {
                
                private final SourceContext<?> emitter;
                
-               private final TimeServiceProvider timerService;
+               private final ProcessingTimeService timerService;
 
                private final long interval;
                
@@ -477,7 +477,7 @@ public abstract class AbstractFetcher<T, KPH> {
                PeriodicWatermarkEmitter(
                                
KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions,
                                SourceContext<?> emitter,
-                               TimeServiceProvider timerService,
+                               ProcessingTimeService timerService,
                                long autoWatermarkInterval)
                {
                        this.allPartitions = checkNotNull(allPartitions);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 7db6ba4..0782cb9 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,10 +25,10 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import 
org.apache.flink.streaming.runtime.operators.TestTimeProviderTest.ReferenceSettingExceptionHandler;
+import 
org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -128,7 +128,7 @@ public class AbstractFetcherTimestampsTest {
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
                final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+               final ProcessingTimeService timerService = new 
SystemProcessingTimeService(
                                new ReferenceSettingExceptionHandler(errorRef), 
sourceContext.getCheckpointLock());
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e1ec4cb..f16eacd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -38,7 +38,7 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.io.Serializable;
 import java.util.Collections;
@@ -53,7 +53,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
 
        private final ExecutionConfig execConfig;
 
-       private final TimeServiceProvider timeServiceProvider;
+       private final ProcessingTimeService timeServiceProvider;
        
        public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
                this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig());
@@ -70,7 +70,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
                        int numberOfParallelSubtasks,
                        int indexOfThisSubtask,
                        ExecutionConfig execConfig,
-                       TimeServiceProvider timeServiceProvider) {
+                       ProcessingTimeService timeServiceProvider) {
                
                super(new MockStreamOperator(),
                        new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
@@ -188,7 +188,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        }
 
        @Override
-       public TimeServiceProvider getTimeServiceProvider() {
+       public ProcessingTimeService getProcessingTimeService() {
                if (timeServiceProvider == null) {
                        throw new UnsupportedOperationException();
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 769cb6f..be22677 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -107,7 +107,7 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
                final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
                this.readerContext = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic, getTimerService(), checkpointLock, 
output, watermarkInterval);
+                       timeCharacteristic, getProcessingTimeService(), 
checkpointLock, output, watermarkInterval);
 
                // and initialize the split reading thread
                this.reader = new SplitReader<>(format, serializer, 
readerContext, checkpointLock, readerState);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 9184e93..b789c95 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -51,11 +51,12 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,7 +119,7 @@ public abstract class AbstractStreamOperator<OUT>
 
        /** Keyed state store view on the keyed backend */
        private transient DefaultKeyedStateStore keyedStateStore;
-       
+
        /** Operator state backend / store */
        private transient OperatorStateBackend operatorStateBackend;
 
@@ -246,7 +247,7 @@ public abstract class AbstractStreamOperator<OUT>
                                                keySerializer,
                                                
container.getConfiguration().getNumberOfKeyGroups(getUserCodeClassloader()),
                                                subTaskKeyGroupRange);
-                               
+
                                this.keyedStateStore = new 
DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());
                        }
 
@@ -396,11 +397,11 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        /**
-        * Returns the {@link TimeServiceProvider} responsible for getting  the 
current
+        * Returns the {@link ProcessingTimeService} responsible for getting  
the current
         * processing time and registering timers.
         */
-       protected TimeServiceProvider getTimerService() {
-               return container.getTimerService();
+       protected ProcessingTimeService getProcessingTimeService() {
+               return container.getProcessingTimeService();
        }
 
        /**
@@ -421,9 +422,9 @@ public abstract class AbstractStreamOperator<OUT>
         */
        @SuppressWarnings("unchecked")
        protected <S extends State, N> S getPartitionedState(
-                       N namespace, TypeSerializer<N> namespaceSerializer, 
+                       N namespace, TypeSerializer<N> namespaceSerializer,
                        StateDescriptor<S, ?> stateDescriptor) throws Exception 
{
-               
+
                if (keyedStateStore != null) {
                        return keyedStateBackend.getPartitionedState(namespace, 
namespaceSerializer, stateDescriptor);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index a07e6b7..5a16db0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -69,7 +69,7 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
                final long watermarkInterval = 
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
 
                this.ctx = StreamSourceContexts.getSourceContext(
-                       timeCharacteristic, getTimerService(), lockingObject, 
collector, watermarkInterval);
+                       timeCharacteristic, getProcessingTimeService(), 
lockingObject, collector, watermarkInterval);
 
                try {
                        userFunction.run(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index d0c4e15..01ae55c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
 
 import java.util.concurrent.ScheduledFuture;
@@ -42,7 +42,7 @@ public class StreamSourceContexts {
         * </ul>
         * */
        public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
-                       TimeCharacteristic timeCharacteristic, 
TimeServiceProvider timeService,
+                       TimeCharacteristic timeCharacteristic, 
ProcessingTimeService processingTimeService,
                        Object checkpointLock, Output<StreamRecord<OUT>> 
output, long watermarkInterval) {
 
                final SourceFunction.SourceContext<OUT> ctx;
@@ -51,7 +51,7 @@ public class StreamSourceContexts {
                                ctx = new 
ManualWatermarkContext<>(checkpointLock, output);
                                break;
                        case IngestionTime:
-                               ctx = new 
AutomaticWatermarkContext<>(timeService, checkpointLock, output, 
watermarkInterval);
+                               ctx = new 
AutomaticWatermarkContext<>(processingTimeService, checkpointLock, output, 
watermarkInterval);
                                break;
                        case ProcessingTime:
                                ctx = new NonTimestampContext<>(checkpointLock, 
output);
@@ -111,7 +111,7 @@ public class StreamSourceContexts {
         */
        private static class AutomaticWatermarkContext<T> implements 
SourceFunction.SourceContext<T> {
 
-               private final TimeServiceProvider timeService;
+               private final ProcessingTimeService timeService;
                private final Object lock;
                private final Output<StreamRecord<T>> output;
                private final StreamRecord<T> reuse;
@@ -122,7 +122,7 @@ public class StreamSourceContexts {
                private volatile long nextWatermarkTime;
 
                private AutomaticWatermarkContext(
-                       final TimeServiceProvider timeService,
+                       final ProcessingTimeService timeService,
                        final Object checkpointLock,
                        final Output<StreamRecord<T>> output,
                        final long watermarkInterval) {
@@ -201,12 +201,12 @@ public class StreamSourceContexts {
 
                private class WatermarkEmittingTask implements Triggerable {
 
-                       private final TimeServiceProvider timeService;
+                       private final ProcessingTimeService timeService;
                        private final Object lock;
                        private final Output<StreamRecord<T>> output;
 
                        private WatermarkEmittingTask(
-                                       TimeServiceProvider timeService,
+                                       ProcessingTimeService timeService,
                                        Object checkpointLock,
                                        Output<StreamRecord<T>> output) {
                                this.timeService = timeService;

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index cd0489f..fc9e39e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.util.List;
 import java.util.Map;
@@ -77,8 +77,8 @@ public class StreamingRuntimeContext extends 
AbstractRuntimeUDFContext {
                return taskEnvironment.getInputSplitProvider();
        }
 
-       public TimeServiceProvider getTimeServiceProvider() {
-               return operator.getTimerService();
+       public ProcessingTimeService getProcessingTimeService() {
+               return operator.getProcessingTimeService();
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index c92ff34..0798ed4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -54,8 +54,8 @@ public class ExtractTimestampsOperator<T>
                super.open();
                watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
                if (watermarkInterval > 0) {
-                       long now = getTimerService().getCurrentProcessingTime();
-                       getTimerService().registerTimer(now + 
watermarkInterval, this);
+                       long now = 
getProcessingTimeService().getCurrentProcessingTime();
+                       getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
                }
                currentWatermark = Long.MIN_VALUE;
        }
@@ -81,8 +81,8 @@ public class ExtractTimestampsOperator<T>
                        output.emitWatermark(new Watermark(currentWatermark));
                }
 
-               long now = getTimerService().getCurrentProcessingTime();
-               getTimerService().registerTimer(now + watermarkInterval, this);
+               long now = 
getProcessingTimeService().getCurrentProcessingTime();
+               getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index f791723..b1402ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -54,8 +54,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
                watermarkInterval = 
getExecutionConfig().getAutoWatermarkInterval();
                
                if (watermarkInterval > 0) {
-                       long now = getTimerService().getCurrentProcessingTime();
-                       getTimerService().registerTimer(now + 
watermarkInterval, this);
+                       long now = 
getProcessingTimeService().getCurrentProcessingTime();
+                       getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
                }
        }
 
@@ -77,8 +77,8 @@ public class TimestampsAndPeriodicWatermarksOperator<T>
                        output.emitWatermark(newWatermark);
                }
 
-               long now = getTimerService().getCurrentProcessingTime();
-               getTimerService().registerTimer(now + watermarkInterval, this);
+               long now = 
getProcessingTimeService().getCurrentProcessingTime();
+               getProcessingTimeService().registerTimer(now + 
watermarkInterval, this);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index b39b760..d331d4d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -125,7 +125,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                
                // decide when to first compute the window and when to slide it
                // the values should align with the start of time (that is, the 
UNIX epoch, not the big bang)
-               final long now = getTimerService().getCurrentProcessingTime();
+               final long now = 
getProcessingTimeService().getCurrentProcessingTime();
                nextEvaluationTime = now + windowSlide - (now % windowSlide);
                nextSlideTime = now + paneSize - (now % paneSize);
 
@@ -166,7 +166,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                }
 
                // make sure the first window happens
-               getTimerService().registerTimer(firstTriggerTime, this);
+               getProcessingTimeService().registerTimer(firstTriggerTime, 
this);
        }
 
        @Override
@@ -230,7 +230,7 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
                }
 
                long nextTriggerTime = Math.min(nextEvaluationTime, 
nextSlideTime);
-               getTimerService().registerTimer(nextTriggerTime, this);
+               getProcessingTimeService().registerTimer(nextTriggerTime, this);
        }
        
        private void computeWindow(long timestamp) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 6609e4d..141b5b8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -307,7 +307,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                }
 
                if (timer != null) {
-                       nextTimer = 
getTimerService().registerTimer(timer.timestamp, this);
+                       nextTimer = 
getProcessingTimeService().registerTimer(timer.timestamp, this);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 4d8f655..459c679 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -223,7 +223,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                windowAssignerContext = new 
WindowAssigner.WindowAssignerContext() {
                        @Override
                        public long getCurrentProcessingTime() {
-                               return 
WindowOperator.this.getTimerService().getCurrentProcessingTime();
+                               return 
WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
                        }
                };
 
@@ -233,7 +233,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                // re-register the restored timers (if any)
                if (processingTimeTimersQueue.size() > 0) {
-                       nextTimer = 
getTimerService().registerTimer(processingTimeTimersQueue.peek().timestamp, 
this);
+                       nextTimer = 
getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp,
 this);
                }
        }
 
@@ -495,7 +495,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                }
 
                if (timer != null) {
-                       nextTimer = 
getTimerService().registerTimer(timer.timestamp, this);
+                       nextTimer = 
getProcessingTimeService().registerTimer(timer.timestamp, this);
                }
        }
 
@@ -697,7 +697,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
                @Override
                public long getCurrentProcessingTime() {
-                       return 
WindowOperator.this.getTimerService().getCurrentProcessingTime();
+                       return 
WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
                }
 
                @Override
@@ -717,7 +717,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                        if (nextTimer != null) {
                                                nextTimer.cancel(false);
                                        }
-                                       nextTimer = 
getTimerService().registerTimer(time, WindowOperator.this);
+                                       nextTimer = 
getProcessingTimeService().registerTimer(time, WindowOperator.this);
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
deleted file mode 100644
index d2c743f..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import javax.annotation.Nonnull;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A {@link TimeServiceProvider} which assigns as current processing time the 
result of calling
- * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
- */
-public class DefaultTimeServiceProvider extends TimeServiceProvider {
-
-       private static final int STATUS_ALIVE = 0;
-       private static final int STATUS_QUIESCED = 1;
-       private static final int STATUS_SHUTDOWN = 2;
-
-       // 
------------------------------------------------------------------------
-
-       /** The containing task that owns this time service provider. */
-       private final AsyncExceptionHandler task;
-
-       /** The lock that timers acquire upon triggering */
-       private final Object checkpointLock;
-
-       /** The executor service that schedules and calls the triggers of this 
task*/
-       private final ScheduledThreadPoolExecutor timerService;
-
-       private final AtomicInteger status;
-
-
-       public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, 
Object checkpointLock) {
-               this(failureHandler, checkpointLock, null);
-       }
-
-       public DefaultTimeServiceProvider(
-                       AsyncExceptionHandler task,
-                       Object checkpointLock,
-                       ThreadFactory threadFactory) {
-
-               this.task = checkNotNull(task);
-               this.checkpointLock = checkNotNull(checkpointLock);
-
-               this.status = new AtomicInteger(STATUS_ALIVE);
-
-               if (threadFactory == null) {
-                       this.timerService = new ScheduledThreadPoolExecutor(1);
-               } else {
-                       this.timerService = new ScheduledThreadPoolExecutor(1, 
threadFactory);
-               }
-
-               // tasks should be removed if the future is canceled
-               this.timerService.setRemoveOnCancelPolicy(true);
-
-               // make sure shutdown removes all pending tasks
-               
this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-               
this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-       }
-
-       @Override
-       public long getCurrentProcessingTime() {
-               return System.currentTimeMillis();
-       }
-
-       @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
-               long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
-
-               // we directly try to register the timer and only react to the 
status on exception
-               // that way we save unnecessary volatile accesses for each timer
-               try {
-                       return timerService.schedule(
-                                       new TriggerTask(task, checkpointLock, 
target, timestamp), delay, TimeUnit.MILLISECONDS);
-               }
-               catch (RejectedExecutionException e) {
-                       final int status = this.status.get();
-                       if (status == STATUS_QUIESCED) {
-                               return new NeverCompleteFuture(delay);
-                       }
-                       else if (status == STATUS_SHUTDOWN) {
-                               throw new IllegalStateException("Timer service 
is shut down");
-                       }
-                       else {
-                               // something else happened, so propagate the 
exception
-                               throw e;
-                       }
-               }
-       }
-
-       @Override
-       public boolean isTerminated() {
-               return status.get() == STATUS_SHUTDOWN;
-       }
-
-       @Override
-       public void quiesceAndAwaitPending() throws InterruptedException {
-               if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
-                       timerService.shutdown();
-
-                       // await forever (almost)
-                       timerService.awaitTermination(365, TimeUnit.DAYS);
-               }
-       }
-
-       @Override
-       public void shutdownService() {
-               if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
-                               status.compareAndSet(STATUS_QUIESCED, 
STATUS_SHUTDOWN))
-               {
-                       timerService.shutdownNow();
-               }
-       }
-
-       // safety net to destroy the thread pool
-       @Override
-       protected void finalize() throws Throwable {
-               super.finalize();
-               timerService.shutdownNow();
-       }
-
-       @VisibleForTesting
-       int getNumTasksScheduled() {
-               BlockingQueue<?> queue = timerService.getQueue();
-               if (queue == null) {
-                       return 0;
-               } else {
-                       return queue.size();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Internal task that is invoked by the timer service and triggers the 
target.
-        */
-       private static final class TriggerTask implements Runnable {
-
-               private final Object lock;
-               private final Triggerable target;
-               private final long timestamp;
-               private final AsyncExceptionHandler exceptionHandler;
-
-               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, Triggerable target, long timestamp) {
-                       this.exceptionHandler = exceptionHandler;
-                       this.lock = lock;
-                       this.target = target;
-                       this.timestamp = timestamp;
-               }
-
-               @Override
-               public void run() {
-                       synchronized (lock) {
-                               try {
-                                       target.trigger(timestamp);
-                               } catch (Throwable t) {
-                                       TimerException asyncException = new 
TimerException(t);
-                                       
exceptionHandler.handleAsyncException("Caught exception while processing 
timer.", asyncException);
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static final class NeverCompleteFuture implements 
ScheduledFuture<Object> {
-
-               private final Object lock = new Object();
-
-               private final long delayMillis;
-
-               private volatile boolean canceled;
-
-
-               private NeverCompleteFuture(long delayMillis) {
-                       this.delayMillis = delayMillis;
-               }
-
-               @Override
-               public long getDelay(@Nonnull TimeUnit unit) {
-                       return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
-               }
-
-               @Override
-               public int compareTo(@Nonnull Delayed o) {
-                       long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
-                       return Long.compare(this.delayMillis, otherMillis);
-               }
-
-               @Override
-               public boolean cancel(boolean mayInterruptIfRunning) {
-                       synchronized (lock) {
-                               canceled = true;
-                               lock.notifyAll();
-                       }
-                       return true;
-               }
-
-               @Override
-               public boolean isCancelled() {
-                       return canceled;
-               }
-
-               @Override
-               public boolean isDone() {
-                       return false;
-               }
-
-               @Override
-               public Object get() throws InterruptedException {
-                       synchronized (lock) {
-                               while (!canceled) {
-                                       lock.wait();
-                               }
-                       }
-                       throw new CancellationException();
-               }
-
-               @Override
-               public Object get(long timeout, @Nonnull TimeUnit unit) throws 
InterruptedException, TimeoutException {
-                       synchronized (lock) {
-                               while (!canceled) {
-                                       unit.timedWait(lock, timeout);
-                               }
-
-                               if (canceled) {
-                                       throw new CancellationException();
-                               } else {
-                                       throw new TimeoutException();
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
new file mode 100644
index 0000000..15c3ebb
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * Defines the current processing time and handles all related actions,
+ * such as register timers for tasks to be executed in the future.
+ * 
+ * <p>The access to the time via {@link #getCurrentProcessingTime()} is always 
available, regardless of
+ * whether the timer service has been shut down.
+ * 
+ * <p>The registration of timers follows a life cycle of three phases:
+ * <ol>
+ *     <li>In the initial state, it accepts timer registrations and triggers 
when the time is reached.</li>
+ *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
+ *         {@link #registerTimer(long, Triggerable)} will not register any 
further timers, and will
+ *         return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
+ *         timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.</li>
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, Triggerable)}
+ *         will result in a hard exception.</li>
+ * </ol>
+ */
+public abstract class ProcessingTimeService {
+
+       /**
+        * Returns the current processing time.
+        */
+       public abstract long getCurrentProcessingTime();
+
+       /**
+        * Registers a task to be executed when (processing) time is {@code 
timestamp}.
+        * 
+        * @param timestamp   Time when the task is to be executed (in 
processing time)
+        * @param target      The task to be executed
+        * 
+        * @return The future that represents the scheduled task. This always 
returns some future,
+        *         even if the timer was shut down
+        */
+       public abstract ScheduledFuture<?> registerTimer(long timestamp, 
Triggerable target);
+
+       /**
+        * Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise.
+        */
+       public abstract boolean isTerminated();
+
+       /**
+        * This method puts the service into a state where it does not register 
new timers, but
+        * returns for each call to {@link #registerTimer(long, Triggerable)} 
only a "mock" future.
+        * Furthermore, the method clears all not yet started timers, and 
awaits the completion
+        * of currently executing timers.
+        * 
+        * <p>This method can be used to cleanly shut down the timer service. 
The using components
+        * will not notice that the service is shut down (as for example via 
exceptions when registering
+        * a new timer), but the service will simply not fire any timer any 
more.
+        */
+       public abstract void quiesceAndAwaitPending() throws 
InterruptedException;
+
+       /**
+        * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait
+        * for any timer to complete. Any further call to {@link 
#registerTimer(long, Triggerable)}
+        * will result in a hard exception.
+        */
+       public abstract void shutdownService();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 77efc7b..905782b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -146,11 +146,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        private AbstractKeyedStateBackend<?> keyedStateBackend;
 
        /**
-        * The internal {@link TimeServiceProvider} used to define the current
+        * The internal {@link ProcessingTimeService} used to define the current
         * processing time (default = {@code System.currentTimeMillis()}) and
         * register timers for tasks to be executed in the future.
         */
-       private TimeServiceProvider timerService;
+       private ProcessingTimeService timerService;
 
        /** The map of user-defined accumulators of this task */
        private Map<String, Accumulator<?, ?>> accumulatorMap;
@@ -190,13 +190,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        // 
------------------------------------------------------------------------
 
        /**
-        * Allows the user to specify his own {@link TimeServiceProvider 
TimerServiceProvider}.
-        * By default a {@link DefaultTimeServiceProvider DefaultTimerService} 
is going to be provided.
+        * Allows the user to specify his own {@link ProcessingTimeService 
TimerServiceProvider}.
+        * By default a {@link SystemProcessingTimeService DefaultTimerService} 
is going to be provided.
         * Changing it can be useful for testing processing time functionality, 
such as
         * {@link 
org.apache.flink.streaming.api.windowing.assigners.WindowAssigner 
WindowAssigners}
         * and {@link org.apache.flink.streaming.api.windowing.triggers.Trigger 
Triggers}.
         * */
-       public void setTimeService(TimeServiceProvider timeProvider) {
+       public void setProcessingTimeService(ProcessingTimeService 
timeProvider) {
                if (timeProvider == null) {
                        throw new RuntimeException("The timeProvider cannot be 
set to null.");
                }
@@ -224,7 +224,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                ThreadFactory timerThreadFactory =
                                        new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
 
-                               timerService = new 
DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
+                               timerService = new 
SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
                        }
 
                        operatorChain = new OperatorChain<>(this, 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -765,10 +765,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        /**
-        * Returns the {@link TimeServiceProvider} responsible for telling the 
current
+        * Returns the {@link ProcessingTimeService} responsible for telling 
the current
         * processing time and registering timers.
         */
-       public TimeServiceProvider getTimerService() {
+       public ProcessingTimeService getProcessingTimeService() {
                if (timerService == null) {
                        throw new IllegalStateException("The timer service has 
not been initialized.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
new file mode 100644
index 0000000..3fd4202
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link ProcessingTimeService} which assigns as current processing time 
the result of calling
+ * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
+ */
+public class SystemProcessingTimeService extends ProcessingTimeService {
+
+       private static final int STATUS_ALIVE = 0;
+       private static final int STATUS_QUIESCED = 1;
+       private static final int STATUS_SHUTDOWN = 2;
+
+       // 
------------------------------------------------------------------------
+
+       /** The containing task that owns this time service provider. */
+       private final AsyncExceptionHandler task;
+
+       /** The lock that timers acquire upon triggering */
+       private final Object checkpointLock;
+
+       /** The executor service that schedules and calls the triggers of this 
task*/
+       private final ScheduledThreadPoolExecutor timerService;
+
+       private final AtomicInteger status;
+
+
+       public SystemProcessingTimeService(AsyncExceptionHandler 
failureHandler, Object checkpointLock) {
+               this(failureHandler, checkpointLock, null);
+       }
+
+       public SystemProcessingTimeService(
+                       AsyncExceptionHandler task,
+                       Object checkpointLock,
+                       ThreadFactory threadFactory) {
+
+               this.task = checkNotNull(task);
+               this.checkpointLock = checkNotNull(checkpointLock);
+
+               this.status = new AtomicInteger(STATUS_ALIVE);
+
+               if (threadFactory == null) {
+                       this.timerService = new ScheduledThreadPoolExecutor(1);
+               } else {
+                       this.timerService = new ScheduledThreadPoolExecutor(1, 
threadFactory);
+               }
+
+               // tasks should be removed if the future is canceled
+               this.timerService.setRemoveOnCancelPolicy(true);
+
+               // make sure shutdown removes all pending tasks
+               
this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+               
this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+       }
+
+       @Override
+       public long getCurrentProcessingTime() {
+               return System.currentTimeMillis();
+       }
+
+       @Override
+       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+               long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
+
+               // we directly try to register the timer and only react to the 
status on exception
+               // that way we save unnecessary volatile accesses for each timer
+               try {
+                       return timerService.schedule(
+                                       new TriggerTask(task, checkpointLock, 
target, timestamp), delay, TimeUnit.MILLISECONDS);
+               }
+               catch (RejectedExecutionException e) {
+                       final int status = this.status.get();
+                       if (status == STATUS_QUIESCED) {
+                               return new NeverCompleteFuture(delay);
+                       }
+                       else if (status == STATUS_SHUTDOWN) {
+                               throw new IllegalStateException("Timer service 
is shut down");
+                       }
+                       else {
+                               // something else happened, so propagate the 
exception
+                               throw e;
+                       }
+               }
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return status.get() == STATUS_SHUTDOWN;
+       }
+
+       @Override
+       public void quiesceAndAwaitPending() throws InterruptedException {
+               if (status.compareAndSet(STATUS_ALIVE, STATUS_QUIESCED)) {
+                       timerService.shutdown();
+
+                       // await forever (almost)
+                       timerService.awaitTermination(365, TimeUnit.DAYS);
+               }
+       }
+
+       @Override
+       public void shutdownService() {
+               if (status.compareAndSet(STATUS_ALIVE, STATUS_SHUTDOWN) || 
+                               status.compareAndSet(STATUS_QUIESCED, 
STATUS_SHUTDOWN))
+               {
+                       timerService.shutdownNow();
+               }
+       }
+
+       // safety net to destroy the thread pool
+       @Override
+       protected void finalize() throws Throwable {
+               super.finalize();
+               timerService.shutdownNow();
+       }
+
+       @VisibleForTesting
+       int getNumTasksScheduled() {
+               BlockingQueue<?> queue = timerService.getQueue();
+               if (queue == null) {
+                       return 0;
+               } else {
+                       return queue.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Internal task that is invoked by the timer service and triggers the 
target.
+        */
+       private static final class TriggerTask implements Runnable {
+
+               private final Object lock;
+               private final Triggerable target;
+               private final long timestamp;
+               private final AsyncExceptionHandler exceptionHandler;
+
+               TriggerTask(AsyncExceptionHandler exceptionHandler, final 
Object lock, Triggerable target, long timestamp) {
+                       this.exceptionHandler = exceptionHandler;
+                       this.lock = lock;
+                       this.target = target;
+                       this.timestamp = timestamp;
+               }
+
+               @Override
+               public void run() {
+                       synchronized (lock) {
+                               try {
+                                       target.trigger(timestamp);
+                               } catch (Throwable t) {
+                                       TimerException asyncException = new 
TimerException(t);
+                                       
exceptionHandler.handleAsyncException("Caught exception while processing 
timer.", asyncException);
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static final class NeverCompleteFuture implements 
ScheduledFuture<Object> {
+
+               private final Object lock = new Object();
+
+               private final long delayMillis;
+
+               private volatile boolean canceled;
+
+
+               private NeverCompleteFuture(long delayMillis) {
+                       this.delayMillis = delayMillis;
+               }
+
+               @Override
+               public long getDelay(@Nonnull TimeUnit unit) {
+                       return unit.convert(delayMillis, TimeUnit.MILLISECONDS);
+               }
+
+               @Override
+               public int compareTo(@Nonnull Delayed o) {
+                       long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
+                       return Long.compare(this.delayMillis, otherMillis);
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       synchronized (lock) {
+                               canceled = true;
+                               lock.notifyAll();
+                       }
+                       return true;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       return canceled;
+               }
+
+               @Override
+               public boolean isDone() {
+                       return false;
+               }
+
+               @Override
+               public Object get() throws InterruptedException {
+                       synchronized (lock) {
+                               while (!canceled) {
+                                       lock.wait();
+                               }
+                       }
+                       throw new CancellationException();
+               }
+
+               @Override
+               public Object get(long timeout, @Nonnull TimeUnit unit) throws 
InterruptedException, TimeoutException {
+                       synchronized (lock) {
+                               while (!canceled) {
+                                       unit.timedWait(lock, timeout);
+                               }
+
+                               if (canceled) {
+                                       throw new CancellationException();
+                               } else {
+                                       throw new TimeoutException();
+                               }
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
new file mode 100644
index 0000000..d2bf133
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is a {@link ProcessingTimeService} used <b>strictly for testing</b> the
+ * processing time functionality.
+ * */
+public class TestProcessingTimeService extends ProcessingTimeService {
+
+       private volatile long currentTime = 0;
+
+       private volatile boolean isTerminated;
+       private volatile boolean isQuiesced;
+
+       // sorts the timers by timestamp so that they are processed in the 
correct order.
+       private final Map<Long, List<Triggerable>> registeredTasks = new 
TreeMap<>();
+
+       
+       public void setCurrentTime(long timestamp) throws Exception {
+               this.currentTime = timestamp;
+
+               if (!isQuiesced) {
+                       // decide which timers to fire and put them in a list
+                       // we do not fire them here to be able to accommodate 
timers
+                       // that register other timers.
+       
+                       Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
+                       List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
+                       while (it.hasNext()) {
+                               Map.Entry<Long, List<Triggerable>> t = 
it.next();
+                               if (t.getKey() <= this.currentTime) {
+                                       toRun.add(t);
+                                       it.remove();
+                               }
+                       }
+       
+                       // now do the actual firing.
+                       for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+                               long now = tasks.getKey();
+                               for (Triggerable task: tasks.getValue()) {
+                                       task.trigger(now);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public long getCurrentProcessingTime() {
+               return currentTime;
+       }
+
+       @Override
+       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+               if (isTerminated) {
+                       throw new IllegalStateException("terminated");
+               }
+               if (isQuiesced) {
+                       return new DummyFuture();
+               }
+
+               if (timestamp <= currentTime) {
+                       try {
+                               target.trigger(timestamp);
+                       } catch (Exception e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+               List<Triggerable> tasks = registeredTasks.get(timestamp);
+               if (tasks == null) {
+                       tasks = new ArrayList<>();
+                       registeredTasks.put(timestamp, tasks);
+               }
+               tasks.add(target);
+
+               return new DummyFuture();
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return isTerminated;
+       }
+
+       @Override
+       public void quiesceAndAwaitPending() {
+               if (!isTerminated) {
+                       isQuiesced = true;
+                       registeredTasks.clear();
+               }
+       }
+
+       @Override
+       public void shutdownService() {
+               this.isTerminated = true;
+       }
+
+       public int getNumRegisteredTimers() {
+               int count = 0;
+               for (List<Triggerable> tasks: registeredTasks.values()) {
+                       count += tasks.size();
+               }
+               return count;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static class DummyFuture implements ScheduledFuture<Object> {
+
+               @Override
+               public long getDelay(TimeUnit unit) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public int compareTo(Delayed o) {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public boolean cancel(boolean mayInterruptIfRunning) {
+                       return true;
+               }
+
+               @Override
+               public boolean isCancelled() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public boolean isDone() {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object get() throws InterruptedException, 
ExecutionException {
+                       throw new UnsupportedOperationException();
+               }
+
+               @Override
+               public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
+                       throw new UnsupportedOperationException();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
deleted file mode 100644
index 9eb6cd1..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This is a {@link TimeServiceProvider} used <b>strictly for testing</b> the
- * processing time functionality.
- * */
-public class TestTimeServiceProvider extends TimeServiceProvider {
-
-       private volatile long currentTime = 0;
-
-       private volatile boolean isTerminated;
-       private volatile boolean isQuiesced;
-
-       // sorts the timers by timestamp so that they are processed in the 
correct order.
-       private final Map<Long, List<Triggerable>> registeredTasks = new 
TreeMap<>();
-
-       
-       public void setCurrentTime(long timestamp) throws Exception {
-               this.currentTime = timestamp;
-
-               if (!isQuiesced) {
-                       // decide which timers to fire and put them in a list
-                       // we do not fire them here to be able to accommodate 
timers
-                       // that register other timers.
-       
-                       Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
-                       List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
-                       while (it.hasNext()) {
-                               Map.Entry<Long, List<Triggerable>> t = 
it.next();
-                               if (t.getKey() <= this.currentTime) {
-                                       toRun.add(t);
-                                       it.remove();
-                               }
-                       }
-       
-                       // now do the actual firing.
-                       for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
-                               long now = tasks.getKey();
-                               for (Triggerable task: tasks.getValue()) {
-                                       task.trigger(now);
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public long getCurrentProcessingTime() {
-               return currentTime;
-       }
-
-       @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
-               if (isTerminated) {
-                       throw new IllegalStateException("terminated");
-               }
-               if (isQuiesced) {
-                       return new DummyFuture();
-               }
-
-               if (timestamp <= currentTime) {
-                       try {
-                               target.trigger(timestamp);
-                       } catch (Exception e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-               List<Triggerable> tasks = registeredTasks.get(timestamp);
-               if (tasks == null) {
-                       tasks = new ArrayList<>();
-                       registeredTasks.put(timestamp, tasks);
-               }
-               tasks.add(target);
-
-               return new DummyFuture();
-       }
-
-       @Override
-       public boolean isTerminated() {
-               return isTerminated;
-       }
-
-       @Override
-       public void quiesceAndAwaitPending() {
-               if (!isTerminated) {
-                       isQuiesced = true;
-                       registeredTasks.clear();
-               }
-       }
-
-       @Override
-       public void shutdownService() {
-               this.isTerminated = true;
-       }
-
-       public int getNumRegisteredTimers() {
-               int count = 0;
-               for (List<Triggerable> tasks: registeredTasks.values()) {
-                       count += tasks.size();
-               }
-               return count;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class DummyFuture implements ScheduledFuture<Object> {
-
-               @Override
-               public long getDelay(TimeUnit unit) {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public int compareTo(Delayed o) {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public boolean cancel(boolean mayInterruptIfRunning) {
-                       return true;
-               }
-
-               @Override
-               public boolean isCancelled() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public boolean isDone() {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public Object get() throws InterruptedException, 
ExecutionException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public Object get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
-                       throw new UnsupportedOperationException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
deleted file mode 100644
index afa6f35..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
-import java.util.concurrent.ScheduledFuture;
-
-/**
- * Defines the current processing time and handles all related actions,
- * such as register timers for tasks to be executed in the future.
- * 
- * <p>The access to the time via {@link #getCurrentProcessingTime()} is always 
available, regardless of
- * whether the timer service has been shut down.
- * 
- * <p>The registration of timers follows a life cycle of three phases:
- * <ol>
- *     <li>In the initial state, it accepts timer registrations and triggers 
when the time is reached.</li>
- *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
- *         {@link #registerTimer(long, Triggerable)} will not register any 
further timers, and will
- *         return a "dummy" future as a result. This is used for clean 
shutdown, where currently firing
- *         timers are waited for and no future timers can be scheduled, 
without causing hard exceptions.</li>
- *     <li>After a call to {@link #shutdownService()}, all calls to {@link 
#registerTimer(long, Triggerable)}
- *         will result in a hard exception.</li>
- * </ol>
- */
-public abstract class TimeServiceProvider {
-
-       /**
-        * Returns the current processing time.
-        */
-       public abstract long getCurrentProcessingTime();
-
-       /**
-        * Registers a task to be executed when (processing) time is {@code 
timestamp}.
-        * 
-        * @param timestamp   Time when the task is to be executed (in 
processing time)
-        * @param target      The task to be executed
-        * 
-        * @return The future that represents the scheduled task. This always 
returns some future,
-        *         even if the timer was shut down
-        */
-       public abstract ScheduledFuture<?> registerTimer(long timestamp, 
Triggerable target);
-
-       /**
-        * Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise.
-        */
-       public abstract boolean isTerminated();
-
-       /**
-        * This method puts the service into a state where it does not register 
new timers, but
-        * returns for each call to {@link #registerTimer(long, Triggerable)} 
only a "mock" future.
-        * Furthermore, the method clears all not yet started timers, and 
awaits the completion
-        * of currently executing timers.
-        * 
-        * <p>This method can be used to cleanly shut down the timer service. 
The using components
-        * will not notice that the service is shut down (as for example via 
exceptions when registering
-        * a new timer), but the service will simply not fire any timer any 
more.
-        */
-       public abstract void quiesceAndAwaitPending() throws 
InterruptedException;
-
-       /**
-        * Shuts down and clean up the timer service provider hard and 
immediately. This does not wait
-        * for any timer to complete. Any further call to {@link 
#registerTimer(long, Triggerable)}
-        * will result in a hard exception.
-        */
-       public abstract void shutdownService();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 42087b4..f87b5ef 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -38,8 +38,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -67,7 +67,7 @@ public class StreamSourceOperatorTest {
                
                final List<StreamElement> output = new ArrayList<>();
                
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                operator.run(new Object(), new CollectorOutput<String>(output));
                
                assertEquals(1, output.size());
@@ -84,7 +84,7 @@ public class StreamSourceOperatorTest {
                                new StreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                operator.cancel();
 
                // run and exit
@@ -104,7 +104,7 @@ public class StreamSourceOperatorTest {
                                new StreamSource<>(new 
InfiniteSource<String>());
 
                
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -137,7 +137,7 @@ public class StreamSourceOperatorTest {
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
                operator.stop();
 
                // run and stop
@@ -156,7 +156,7 @@ public class StreamSourceOperatorTest {
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0);
 
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -189,7 +189,7 @@ public class StreamSourceOperatorTest {
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
                // emit latency marks every 10 milliseconds.
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
10, null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
10);
 
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -225,15 +225,15 @@ public class StreamSourceOperatorTest {
                        new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
                long watermarkInterval = 10;
-               TestTimeServiceProvider timeProvider = new 
TestTimeServiceProvider();
-               timeProvider.setCurrentTime(0);
+               TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
+               processingTimeService.setCurrentTime(0);
 
-               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, 0, timeProvider);
+               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, 0, processingTimeService);
 
                final List<StreamElement> output = new ArrayList<>();
 
                
StreamSourceContexts.getSourceContext(TimeCharacteristic.IngestionTime,
-                       operator.getContainingTask().getTimerService(),
+                       operator.getContainingTask().getProcessingTimeService(),
                        operator.getContainingTask().getCheckpointLock(),
                        new CollectorOutput<String>(output),
                        
operator.getExecutionConfig().getAutoWatermarkInterval());
@@ -243,7 +243,7 @@ public class StreamSourceOperatorTest {
                // going to be aligned with the watermark interval.
 
                for (long i = 1; i < 100; i += watermarkInterval)  {
-                       timeProvider.setCurrentTime(i);
+                       processingTimeService.setCurrentTime(i);
                }
 
                assertTrue(output.size() == 9);
@@ -257,13 +257,21 @@ public class StreamSourceOperatorTest {
        }
 
        // 
------------------------------------------------------------------------
-       
+
+       @SuppressWarnings("unchecked")
+       private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
+                       TimeCharacteristic timeChar,
+                       long watermarkInterval,
+                       long latencyMarkInterval) {
+               setupSourceOperator(operator, timeChar, watermarkInterval, 
latencyMarkInterval, new TestProcessingTimeService());
+       }
+
        @SuppressWarnings("unchecked")
        private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
                                                                                
                TimeCharacteristic timeChar,
                                                                                
                long watermarkInterval,
                                                                                
                long latencyMarkInterval,
-                                                                               
                final TimeServiceProvider timeProvider) {
+                                                                               
                final ProcessingTimeService timeProvider) {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setAutoWatermarkInterval(watermarkInterval);
@@ -284,12 +292,15 @@ public class StreamSourceOperatorTest {
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
                
when(mockTask.getAccumulatorMap()).thenReturn(Collections.<String, 
Accumulator<?, ?>>emptyMap());
 
-               doAnswer(new Answer<TimeServiceProvider>() {
+               doAnswer(new Answer<ProcessingTimeService>() {
                        @Override
-                       public TimeServiceProvider answer(InvocationOnMock 
invocation) throws Throwable {
+                       public ProcessingTimeService answer(InvocationOnMock 
invocation) throws Throwable {
+                               if (timeProvider == null) {
+                                       throw new RuntimeException("The time 
provider is null.");
+                               }
                                return timeProvider;
                        }
-               }).when(mockTask).getTimerService();
+               }).when(mockTask).getProcessingTimeService();
 
                operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 98058e8..fb1fab5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -64,7 +64,7 @@ public class StreamTaskTimerTest {
                testHarness.waitForTaskRunning();
 
                // first one spawns thread
-               
mapTask.getTimerService().registerTimer(System.currentTimeMillis(), new 
Triggerable() {
+               
mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), 
new Triggerable() {
                        @Override
                        public void trigger(long timestamp) {
                        }
@@ -106,7 +106,7 @@ public class StreamTaskTimerTest {
                        final long t3 = System.currentTimeMillis() + 100;
                        final long t4 = System.currentTimeMillis() + 200;
 
-                       TimeServiceProvider timeService = 
mapTask.getTimerService();
+                       ProcessingTimeService timeService = 
mapTask.getProcessingTimeService();
                        timeService.registerTimer(t1, new 
ValidatingTriggerable(errorRef, t1, 0));
                        timeService.registerTimer(t2, new 
ValidatingTriggerable(errorRef, t2, 1));
                        timeService.registerTimer(t3, new 
ValidatingTriggerable(errorRef, t3, 2));

http://git-wip-us.apache.org/repos/asf/flink/blob/e112a632/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
new file mode 100644
index 0000000..9c2cee3
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TestProcessingTimeServiceTest {
+
+       @Test
+       public void testCustomTimeServiceProvider() throws Throwable {
+               TestProcessingTimeService tp = new TestProcessingTimeService();
+
+               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
+               mapTask.setProcessingTimeService(tp);
+
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
StreamTaskTimerTest.DummyMapFunction<String>());
+               streamConfig.setStreamOperator(mapOperator);
+
+               testHarness.invoke();
+
+               
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
0);
+
+               tp.setCurrentTime(11);
+               
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
11);
+
+               tp.setCurrentTime(15);
+               tp.setCurrentTime(16);
+               
assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 
16);
+
+               // register 2 tasks
+               mapTask.getProcessingTimeService().registerTimer(30, new 
Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               mapTask.getProcessingTimeService().registerTimer(40, new 
Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               assertEquals(2, tp.getNumRegisteredTimers());
+
+               tp.setCurrentTime(35);
+               assertEquals(1, tp.getNumRegisteredTimers());
+
+               tp.setCurrentTime(40);
+               assertEquals(0, tp.getNumRegisteredTimers());
+
+               tp.shutdownService();
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
+
+               private final AtomicReference<Throwable> errorReference;
+
+               public 
ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+                       this.errorReference = errorReference;
+               }
+
+               @Override
+               public void handleAsyncException(String message, Throwable 
exception) {
+                       errorReference.compareAndSet(null, exception);
+               }
+       }
+}

Reply via email to