[hotfix] Various code cleanups around time service and asynchronous exceptions

  - DefaultTimeServiceProvider now owns scheduled executor
  - Enforce that an asynchronous exception handler is always set


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/954ef08f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/954ef08f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/954ef08f

Branch: refs/heads/master
Commit: 954ef08f374d7e7c1f2b469201b1ea05c6376b33
Parents: 8ff451b
Author: Stephan Ewen <[email protected]>
Authored: Tue Oct 4 16:15:05 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 5 19:36:13 2016 +0200

----------------------------------------------------------------------
 .../AbstractFetcherTimestampsTest.java          | 122 +++++++++++--------
 .../kafka/testutils/MockRuntimeContext.java     |  40 +++---
 .../api/operators/StreamSourceContexts.java     |   6 +-
 .../runtime/io/StreamInputProcessor.java        |   4 +-
 .../runtime/tasks/AsyncExceptionHandler.java    |   1 +
 .../runtime/tasks/AsynchronousException.java    |  11 +-
 .../tasks/DefaultTimeServiceProvider.java       |  57 +++++----
 .../runtime/tasks/OneInputStreamTask.java       |   2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  18 +--
 .../runtime/tasks/TestTimeServiceProvider.java  |   2 +-
 .../runtime/tasks/TwoInputStreamTask.java       |   2 +-
 .../runtime/operators/TimeProviderTest.java     |  45 +++++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  84 ++++++++-----
 ...AlignedProcessingTimeWindowOperatorTest.java | 101 +++++++++------
 .../operators/windowing/NoOpTimerService.java   |  49 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java |   6 +-
 16 files changed, 347 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 8c68fbe..c3ba7b7 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -25,7 +25,10 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.util.SerializedValue;
 
 import org.junit.Test;
@@ -34,6 +37,7 @@ import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 
@@ -110,6 +114,7 @@ public class AbstractFetcherTimestampsTest {
        
        @Test
        public void testPeriodicWatermarks() throws Exception {
+
                ExecutionConfig config = new ExecutionConfig();
                config.setAutoWatermarkInterval(10);
                
@@ -120,61 +125,70 @@ public class AbstractFetcherTimestampsTest {
 
                TestSourceContext<Long> sourceContext = new 
TestSourceContext<>();
 
-               TestFetcher<Long> fetcher = new TestFetcher<>(
-                               sourceContext, originalPartitions,
-                               new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
-                               null, new MockRuntimeContext(17, 3, config, 
sourceContext.getCheckpointLock()));
-
-               final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
-               final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
-               final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
-
-               // elements generate a watermark if the timestamp is a multiple 
of three
-
-               // elements for partition 1
-               fetcher.emitRecord(1L, part1, 1L);
-               fetcher.emitRecord(2L, part1, 2L);
-               fetcher.emitRecord(3L, part1, 3L);
-               assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
-               assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
-
-               // elements for partition 2
-               fetcher.emitRecord(12L, part2, 1L);
-               assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
-               assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(errorRef), 
sourceContext.getCheckpointLock());
 
-               // elements for partition 3
-               fetcher.emitRecord(101L, part3, 1L);
-               fetcher.emitRecord(102L, part3, 2L);
-               assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
-               assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
-
-               // now, we should have a watermark (this blocks until the 
periodic thread emitted the watermark)
-               assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
-
-               // advance partition 3
-               fetcher.emitRecord(1003L, part3, 3L);
-               fetcher.emitRecord(1004L, part3, 4L);
-               fetcher.emitRecord(1005L, part3, 5L);
-               assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
-               assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
-
-               // advance partition 1 beyond partition 2 - this bumps the 
watermark
-               fetcher.emitRecord(30L, part1, 4L);
-               assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
-               assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
-               
-               // this blocks until the periodic thread emitted the watermark
-               assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
-
-               // advance partition 2 again - this bumps the watermark
-               fetcher.emitRecord(13L, part2, 2L);
-               fetcher.emitRecord(14L, part2, 3L);
-               fetcher.emitRecord(15L, part2, 3L);
-
-               // this blocks until the periodic thread emitted the watermark
-               long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
-               assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+               try {
+                       TestFetcher<Long> fetcher = new TestFetcher<>(
+                                       sourceContext, originalPartitions,
+                                       new 
SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new 
PeriodicTestExtractor()),
+                                       null, new MockRuntimeContext(17, 3, 
config, timerService));
+       
+                       final KafkaTopicPartitionState<Object> part1 = 
fetcher.subscribedPartitions()[0];
+                       final KafkaTopicPartitionState<Object> part2 = 
fetcher.subscribedPartitions()[1];
+                       final KafkaTopicPartitionState<Object> part3 = 
fetcher.subscribedPartitions()[2];
+       
+                       // elements generate a watermark if the timestamp is a 
multiple of three
+       
+                       // elements for partition 1
+                       fetcher.emitRecord(1L, part1, 1L);
+                       fetcher.emitRecord(2L, part1, 2L);
+                       fetcher.emitRecord(3L, part1, 3L);
+                       assertEquals(3L, 
sourceContext.getLatestElement().getValue().longValue());
+                       assertEquals(3L, 
sourceContext.getLatestElement().getTimestamp());
+       
+                       // elements for partition 2
+                       fetcher.emitRecord(12L, part2, 1L);
+                       assertEquals(12L, 
sourceContext.getLatestElement().getValue().longValue());
+                       assertEquals(12L, 
sourceContext.getLatestElement().getTimestamp());
+       
+                       // elements for partition 3
+                       fetcher.emitRecord(101L, part3, 1L);
+                       fetcher.emitRecord(102L, part3, 2L);
+                       assertEquals(102L, 
sourceContext.getLatestElement().getValue().longValue());
+                       assertEquals(102L, 
sourceContext.getLatestElement().getTimestamp());
+       
+                       // now, we should have a watermark (this blocks until 
the periodic thread emitted the watermark)
+                       assertEquals(3L, 
sourceContext.getLatestWatermark().getTimestamp());
+       
+                       // advance partition 3
+                       fetcher.emitRecord(1003L, part3, 3L);
+                       fetcher.emitRecord(1004L, part3, 4L);
+                       fetcher.emitRecord(1005L, part3, 5L);
+                       assertEquals(1005L, 
sourceContext.getLatestElement().getValue().longValue());
+                       assertEquals(1005L, 
sourceContext.getLatestElement().getTimestamp());
+       
+                       // advance partition 1 beyond partition 2 - this bumps 
the watermark
+                       fetcher.emitRecord(30L, part1, 4L);
+                       assertEquals(30L, 
sourceContext.getLatestElement().getValue().longValue());
+                       assertEquals(30L, 
sourceContext.getLatestElement().getTimestamp());
+                       
+                       // this blocks until the periodic thread emitted the 
watermark
+                       assertEquals(12L, 
sourceContext.getLatestWatermark().getTimestamp());
+       
+                       // advance partition 2 again - this bumps the watermark
+                       fetcher.emitRecord(13L, part2, 2L);
+                       fetcher.emitRecord(14L, part2, 3L);
+                       fetcher.emitRecord(15L, part2, 3L);
+       
+                       // this blocks until the periodic thread emitted the 
watermark
+                       long watermarkTs = 
sourceContext.getLatestWatermark().getTimestamp();
+                       assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+               }
+               finally {
+                       timerService.shutdownService();
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 5be4195..e1ec4cb 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -32,45 +32,46 @@ import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
-import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
        private final int numberOfParallelSubtasks;
        private final int indexOfThisSubtask;
-       
-       private final ExecutionConfig execConfig;
 
-       private final TimeServiceProvider timerService;
+       private final ExecutionConfig execConfig;
 
+       private final TimeServiceProvider timeServiceProvider;
+       
        public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
-               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig(), new Object());
+               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig());
        }
 
        public MockRuntimeContext(
-               int numberOfParallelSubtasks,
-               int indexOfThisSubtask,
-               ExecutionConfig execConfig,
-               Object checkpointLock) {
-
+                       int numberOfParallelSubtasks,
+                       int indexOfThisSubtask,
+                       ExecutionConfig execConfig) {
+               this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, 
null);
+       }
+       
+       public MockRuntimeContext(
+                       int numberOfParallelSubtasks,
+                       int indexOfThisSubtask,
+                       ExecutionConfig execConfig,
+                       TimeServiceProvider timeServiceProvider) {
+               
                super(new MockStreamOperator(),
                        new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
                        Collections.<String, Accumulator<?, ?>>emptyMap());
@@ -78,8 +79,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
                this.numberOfParallelSubtasks = numberOfParallelSubtasks;
                this.indexOfThisSubtask = indexOfThisSubtask;
                this.execConfig = execConfig;
-               this.timerService = DefaultTimeServiceProvider.
-                       
createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
+               this.timeServiceProvider = timeServiceProvider;
        }
 
        @Override
@@ -189,7 +189,11 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
 
        @Override
        public TimeServiceProvider getTimeServiceProvider() {
-               return timerService;
+               if (timeServiceProvider == null) {
+                       throw new UnsupportedOperationException();
+               } else {
+                       return timeServiceProvider;
+               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index abaf4e7..a290deb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -35,9 +35,9 @@ public class StreamSourceContexts {
         * Depending on the {@link TimeCharacteristic}, this method will return 
the adequate
         * {@link 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext}. 
That is:
         * <ul>
-        * <li> {@link TimeCharacteristic#IngestionTime} = {@link 
AutomaticWatermarkContext}
-        * <li> {@link TimeCharacteristic#ProcessingTime} = {@link 
NonTimestampContext}
-        * <li> {@link TimeCharacteristic#EventTime} = {@link 
ManualWatermarkContext}
+        *     <li>{@link TimeCharacteristic#IngestionTime} = {@link 
AutomaticWatermarkContext}</li>
+        *     <li>{@link TimeCharacteristic#ProcessingTime} = {@link 
NonTimestampContext}</li>
+        *     <li>{@link TimeCharacteristic#EventTime} = {@link 
ManualWatermarkContext}</li>
         * </ul>
         * */
        public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 85e9297..2dbc6d4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -83,7 +83,9 @@ public class StreamInputProcessor<IN> {
        private Counter numRecordsIn;
 
        @SuppressWarnings("unchecked")
-       public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> 
inputSerializer,
+       public StreamInputProcessor(
+                       InputGate[] inputGates,
+                       TypeSerializer<IN> inputSerializer,
                        StatefulTask checkpointedTask,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
index 4c55055..a8125c3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
index 311e0cd..cda0511 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsynchronousException.java
@@ -15,22 +15,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 
 /**
- * {@code RuntimeException} for wrapping exceptions that are thrown in Threads 
that are not the
- * main compute Thread.
+ * An exception for wrapping exceptions that are thrown by an operator in 
threads other than the
+ * main compute thread of that operator. 
  */
 @Internal
-public class AsynchronousException extends RuntimeException {
+public class AsynchronousException extends Exception {
        private static final long serialVersionUID = 1L;
 
        public AsynchronousException(Throwable cause) {
                super(cause);
        }
 
+       public AsynchronousException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
        @Override
        public String toString() {
                return "AsynchronousException{" + getCause() + "}";

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index 9534b3c..5664eac 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,15 +17,15 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.util.Preconditions;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A {@link TimeServiceProvider} which assigns as current processing time the 
result of calling
  * {@link System#currentTimeMillis()} and registers timers using a {@link 
ScheduledThreadPoolExecutor}.
@@ -35,24 +35,34 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        /** The containing task that owns this time service provider. */
        private final AsyncExceptionHandler task;
 
+       /** The lock that timers acquire upon triggering */
        private final Object checkpointLock;
 
        /** The executor service that schedules and calls the triggers of this 
task*/
-       private final ScheduledExecutorService timerService;
+       private final ScheduledThreadPoolExecutor timerService;
+
 
-       public static DefaultTimeServiceProvider create(
-                       AsyncExceptionHandler exceptionHandler,
-                       ScheduledExecutorService executor,
-                       Object checkpointLock) {
-               return new DefaultTimeServiceProvider(exceptionHandler, 
executor, checkpointLock);
+       public DefaultTimeServiceProvider(AsyncExceptionHandler failureHandler, 
Object checkpointLock) {
+               this(failureHandler, checkpointLock, null);
        }
 
-       private DefaultTimeServiceProvider(AsyncExceptionHandler task,
-                                                                       
ScheduledExecutorService threadPoolExecutor,
-                                                                       Object 
checkpointLock) {
-               this.task = Preconditions.checkNotNull(task);
-               this.timerService = 
Preconditions.checkNotNull(threadPoolExecutor);
-               this.checkpointLock = 
Preconditions.checkNotNull(checkpointLock);
+       public DefaultTimeServiceProvider(
+                       AsyncExceptionHandler task,
+                       Object checkpointLock,
+                       ThreadFactory threadFactory) {
+               
+               this.task = checkNotNull(task);
+               this.checkpointLock = checkNotNull(checkpointLock);
+
+               if (threadFactory == null) {
+                       this.timerService = new ScheduledThreadPoolExecutor(1);
+               } else {
+                       this.timerService = new ScheduledThreadPoolExecutor(1, 
threadFactory);
+               }
+
+               // allow trigger tasks to be removed if all timers for
+               // that timestamp are removed by user
+               this.timerService.setRemoveOnCancelPolicy(true);
        }
 
        @Override
@@ -76,6 +86,13 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                timerService.shutdownNow();
        }
 
+       // safety net to destroy the thread pool
+       @Override
+       protected void finalize() throws Throwable {
+               super.finalize();
+               timerService.shutdownNow();
+       }
+
        /**
         * Internal task that is invoked by the timer service and triggers the 
target.
         */
@@ -105,14 +122,4 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
                        }
                }
        }
-
-       @VisibleForTesting
-       public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
-               return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
-                       @Override
-                       public void handleAsyncException(String message, 
Throwable exception) {
-                               exception.printStackTrace();
-                       }
-               }, executor, checkpointLock);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index cf8853e..0a6534b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -64,7 +64,7 @@ public class OneInputStreamTask<IN, OUT> extends 
StreamTask<OUT, OneInputStreamO
                final Object lock = getCheckpointLock();
                
                while (running && inputProcessor.processInput(operator, lock)) {
-
+                       // all the work happens in the "processInput" method
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 33317fa..040ec66 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -67,7 +67,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing 
that is deployed
@@ -223,15 +223,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
                        // if the clock is not already set, then assign a 
default TimeServiceProvider
                        if (timerService == null) {
+                               ThreadFactory timerThreadFactory =
+                                       new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
 
-                               ScheduledThreadPoolExecutor executor = new 
ScheduledThreadPoolExecutor(1,
-                                       new 
DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
-
-                               // allow trigger tasks to be removed if all 
timers for
-                               // that timestamp are removed by user
-                               executor.setRemoveOnCancelPolicy(true);
-
-                               timerService = 
DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
+                               timerService = new 
DefaultTimeServiceProvider(this, getCheckpointLock(), timerThreadFactory);
                        }
 
                        operatorChain = new OperatorChain<>(this, 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
@@ -305,10 +300,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // stop all timers and threads
                        if (timerService != null) {
                                try {
-                                       if (!timerService.isTerminated()) {
-                                               LOG.info("Timer service is 
shutting down.");
-                                               timerService.shutdownService();
-                                       }
+                                       timerService.shutdownService();
                                }
                                catch (Throwable t) {
                                        // catch and log the exception to not 
replace the original exception

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index a21a2e1..81faec9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -86,7 +86,7 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                return isTerminated;
        }
 
-       public int getNoOfRegisteredTimers() {
+       public int getNumRegisteredTimers() {
                int count = 0;
                for (List<Triggerable> tasks: registeredTasks.values()) {
                        count += tasks.size();

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 0197c53..fb08959 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -88,7 +88,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                final Object lock = getCheckpointLock();
                
                while (running && inputProcessor.processInput(operator, lock)) {
-
+                       // all the work happens in the "processInput" method
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
index 0351978..8d3e621 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -28,6 +29,7 @@ import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,13 +39,14 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
+@PrepareForTest({ResultPartitionWriter.class})
 @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
 public class TimeProviderTest {
 
@@ -52,8 +55,10 @@ public class TimeProviderTest {
                final OneShotLatch latch = new OneShotLatch();
 
                final Object lock = new Object();
-               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
-                       
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               
+               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                final List<Long> timestamps = new ArrayList<>();
 
@@ -114,6 +119,8 @@ public class TimeProviderTest {
                        lastTs = timestamp;
                        counter++;
                }
+
+               assertNull(error.get());
        }
 
        @Test
@@ -124,14 +131,14 @@ public class TimeProviderTest {
 
                final Object lock = new Object();
 
-               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
-                       .create(new AsyncExceptionHandler() {
+               TimeServiceProvider timeServiceProvider = new 
DefaultTimeServiceProvider(
+                       new AsyncExceptionHandler() {
                                @Override
                                public void handleAsyncException(String 
message, Throwable exception) {
                                        exceptionWasThrown.compareAndSet(false, 
true);
                                        latch.trigger();
                                }
-                       }, Executors.newSingleThreadScheduledExecutor(), lock);
+                       }, lock);
 
                long now = System.currentTimeMillis();
                timeServiceProvider.registerTimer(now, new Triggerable() {
@@ -182,7 +189,7 @@ public class TimeProviderTest {
                        }
                });
 
-               Assert.assertEquals(provider.getNoOfRegisteredTimers(), 4);
+               Assert.assertEquals(provider.getNumRegisteredTimers(), 4);
 
                provider.setCurrentTime(100);
                long seen = 0;
@@ -233,14 +240,30 @@ public class TimeProviderTest {
                        }
                });
 
-               assertEquals(2, tp.getNoOfRegisteredTimers());
+               assertEquals(2, tp.getNumRegisteredTimers());
 
                tp.setCurrentTime(35);
-               assertEquals(1, tp.getNoOfRegisteredTimers());
+               assertEquals(1, tp.getNumRegisteredTimers());
 
                tp.setCurrentTime(40);
-               assertEquals(0, tp.getNoOfRegisteredTimers());
+               assertEquals(0, tp.getNumRegisteredTimers());
 
                tp.shutdownService();
        }
+
+       // 
------------------------------------------------------------------------
+
+       public static class ReferenceSettingExceptionHandler implements 
AsyncExceptionHandler {
+
+               private final AtomicReference<Throwable> errorReference;
+
+               public 
ReferenceSettingExceptionHandler(AtomicReference<Throwable> errorReference) {
+                       this.errorReference = errorReference;
+               }
+
+               @Override
+               public void handleAsyncException(String message, Throwable 
exception) {
+                       errorReference.compareAndSet(null, exception);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 30f38e3..4c6d391 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -58,7 +59,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -182,14 +183,11 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
-               final Object lock = new Object();
-               TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
-
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
+                       final TimeServiceProvider timerService = new 
NoOpTimerService();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, new Object());
 
                        AccumulatingProcessingTimeWindowOperator<String, 
String, String> op;
 
@@ -201,11 +199,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -214,11 +207,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -227,11 +215,6 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -244,16 +227,15 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
-               finally {
-                       timerService.shutdownService();
-               }
        }
 
        @Test
        public void testTumblingWindow() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final int windowSize = 50;
@@ -285,6 +267,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 
                        List<Integer> result = out.getElements();
@@ -294,6 +278,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        for (int i = 0; i < numElements; i++) {
                                assertEquals(i, result.get(i).intValue());
                        }
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -307,8 +295,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testSlidingWindow() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
@@ -335,6 +325,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 
                        // get and verify the result
@@ -361,6 +353,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                        lastCount = 1;
                                }
                        }
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                } finally {
                        timerService.shutdownService();
                }
@@ -369,8 +365,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testTumblingWindowSingleElements() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
@@ -412,7 +410,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -426,8 +430,10 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testSlidingWindowSingleElements() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+               
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                       new ReferenceSettingExceptionHandler(error), lock);
 
                try {
                        final CollectingOutput<Integer> out = new 
CollectingOutput<>(50);
@@ -460,7 +466,13 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -798,4 +810,12 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                }
                return result;
        }
+
+       private static void shutdownTimerServiceAndWait(TimeServiceProvider 
timers) throws Exception {
+               timers.shutdownService();
+
+               while (!timers.isTerminated()) {
+                       Thread.sleep(2);
+               }
+       } 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7539c2d..88e28bc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -40,14 +40,15 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
+import 
org.apache.flink.streaming.runtime.operators.TimeProviderTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.After;
 import org.junit.Test;
 
@@ -60,7 +61,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -190,15 +191,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
 
        @Test
        public void testWindowTriggerTimeAlignment() throws Exception {
-               final Object lock = new Object();
-               TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
-
                try {
                        @SuppressWarnings("unchecked")
                        final Output<StreamRecord<String>> mockOut = 
mock(Output.class);
-                       StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, lock);
-                       
+                       final TimeServiceProvider timerService = new 
NoOpTimerService();
+                       final StreamTask<?, ?> mockTask = 
createMockTaskWithTimer(timerService, new Object());
+
                        AggregatingProcessingTimeWindowOperator<String, String> 
op;
 
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
@@ -209,11 +207,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1000, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -222,11 +215,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1500, 1000);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -235,11 +223,6 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        assertTrue(op.getNextEvaluationTime() % 1000 == 0);
                        op.dispose();
 
-                       timerService.shutdownService();
-                       timerService = 
DefaultTimeServiceProvider.createForTesting(
-                               Executors.newSingleThreadScheduledExecutor(), 
lock);
-                       mockTask = createMockTaskWithTimer(timerService, lock);
-
                        op = new 
AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
                                        StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 1200, 1100);
                        op.setup(mockTask, new StreamConfig(new 
Configuration()), mockOut);
@@ -251,16 +234,16 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
-               } finally {
-                       timerService.shutdownService();
                }
        }
 
        @Test
        public void testTumblingWindowUniqueElements() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final int windowSize = 50;
@@ -297,6 +280,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 
 
@@ -305,6 +290,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                assertEquals(i, result.get(i).f0.intValue());
                                assertEquals(i, result.get(i).f1.intValue());
                        }
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -318,8 +307,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testTumblingWindowDuplicateElements() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
                try {
                        final int windowSize = 50;
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(windowSize);
@@ -364,6 +355,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 
                        // we have ideally one element per window. we may have 
more, when we emitted a value into the
@@ -373,6 +366,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        // deduplicate for more accurate checks
                        HashSet<Tuple2<Integer, Integer>> set = new 
HashSet<>(result);
                        assertTrue(set.size() == 10);
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -386,8 +383,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testSlidingWindow() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
@@ -418,6 +417,8 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
 
                        // get and verify the result
@@ -445,6 +446,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        lastCount = 1;
                                }
                        }
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -458,8 +463,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testSlidingWindowSingleElements() throws Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>(50);
@@ -504,7 +511,13 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                        synchronized (lock) {
                                op.close();
                        }
+
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -518,8 +531,10 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        @Test
        public void testPropagateExceptionsFromProcessElement() throws 
Exception {
                final Object lock = new Object();
-               final TimeServiceProvider timerService = 
DefaultTimeServiceProvider.createForTesting(
-                       Executors.newSingleThreadScheduledExecutor(), lock);
+               final AtomicReference<Throwable> error = new 
AtomicReference<>();
+
+               final TimeServiceProvider timerService = new 
DefaultTimeServiceProvider(
+                               new ReferenceSettingExceptionHandler(error), 
lock);
 
                try {
                        final CollectingOutput<Tuple2<Integer, Integer>> out = 
new CollectingOutput<>();
@@ -556,7 +571,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                assertTrue(e.getMessage().contains("Artificial 
Test Exception"));
                        }
 
+                       shutdownTimerServiceAndWait(timerService);
                        op.dispose();
+
+                       if (error.get() != null) {
+                               throw new Exception(error.get());
+                       }
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -971,8 +991,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
        }
 
        private static StreamConfig createTaskConfig(KeySelector<?, ?> 
partitioner, TypeSerializer<?> keySerializer, int numberOfKeGroups) {
-               StreamConfig cfg = new StreamConfig(new Configuration());
-               return cfg;
+               return new StreamConfig(new Configuration());
        }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
@@ -985,4 +1004,12 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                }
                return result;
        }
+
+       private static void shutdownTimerServiceAndWait(TimeServiceProvider 
timers) throws Exception {
+               timers.shutdownService();
+
+               while (!timers.isTerminated()) {
+                       Thread.sleep(2);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
new file mode 100644
index 0000000..16e658e
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NoOpTimerService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
+import java.util.concurrent.ScheduledFuture;
+
+class NoOpTimerService extends TimeServiceProvider {
+
+       private volatile boolean terminated;
+       
+       @Override
+       public long getCurrentProcessingTime() {
+               return System.currentTimeMillis();
+       }
+
+       @Override
+       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+               return null;
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return terminated;
+       }
+
+       @Override
+       public void shutdownService() throws Exception {
+               terminated = true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/954ef08f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index d8a0ee2..9d8e6a5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -42,12 +42,12 @@ import 
org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -145,7 +145,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                                @Override
                                public CheckpointStreamFactory 
answer(InvocationOnMock invocationOnMock) throws Throwable {
 
-                                       final StreamOperator operator = 
(StreamOperator) invocationOnMock.getArguments()[0];
+                                       final StreamOperator<?> operator = 
(StreamOperator<?>) invocationOnMock.getArguments()[0];
                                        return 
stateBackend.createStreamFactory(new JobID(), 
operator.getClass().getSimpleName());
                                }
                        
}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
@@ -154,7 +154,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                }
 
                timeServiceProvider = testTimeProvider != null ? 
testTimeProvider :
-                       DefaultTimeServiceProvider.create(mockTask, 
Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
+                       new DefaultTimeServiceProvider(mockTask, 
this.checkpointLock);
 
                doAnswer(new Answer<TimeServiceProvider>() {
                        @Override

Reply via email to