[FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead of a 
Runnable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4779c7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4779c7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4779c7ec

Branch: refs/heads/master
Commit: 4779c7eca0f7e91dd5ee38122baa3fe99c8b7bea
Parents: 568845a
Author: kl0u <kklou...@gmail.com>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Sep 23 15:01:06 2016 +0200

----------------------------------------------------------------------
 .../kafka/testutils/MockRuntimeContext.java     |  41 +---
 .../runtime/tasks/AsyncExceptionHandler.java    |  31 +++
 .../tasks/DefaultTimeServiceProvider.java       |  76 ++++++-
 .../streaming/runtime/tasks/StreamTask.java     |  81 +++----
 .../runtime/tasks/TestTimeServiceProvider.java  |  44 ++--
 .../runtime/tasks/TimeServiceProvider.java      |   6 +-
 .../operators/StreamSourceOperatorTest.java     |  14 +-
 .../runtime/operators/StreamTaskTimerTest.java  |  53 -----
 .../runtime/operators/TimeProviderTest.java     | 214 +++++++++++++++++++
 ...AlignedProcessingTimeWindowOperatorTest.java |  12 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |   2 +
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +-
 .../util/OneInputStreamOperatorTestHarness.java |  85 ++++----
 .../streaming/util/WindowingTestHarness.java    |   2 -
 14 files changed, 436 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 2d5e2d8..7a50569 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -58,38 +58,28 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        private final int indexOfThisSubtask;
        
        private final ExecutionConfig execConfig;
-       private final Object checkpointLock;
 
        private final TimeServiceProvider timerService;
 
        public MockRuntimeContext(int numberOfParallelSubtasks, int 
indexOfThisSubtask) {
-               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig(), null);
+               this(numberOfParallelSubtasks, indexOfThisSubtask, new 
ExecutionConfig(), new Object());
        }
 
        public MockRuntimeContext(
-               int numberOfParallelSubtasks, int indexOfThisSubtask,
+               int numberOfParallelSubtasks,
+               int indexOfThisSubtask,
                ExecutionConfig execConfig,
                Object checkpointLock) {
 
-               this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, 
checkpointLock,
-                       
DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
-       }
-
-       public MockRuntimeContext(
-                       int numberOfParallelSubtasks, int indexOfThisSubtask,
-                       ExecutionConfig execConfig,
-                       Object checkpointLock,
-                       TimeServiceProvider timerService) {
-
                super(new MockStreamOperator(),
-                               new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-                               Collections.<String, Accumulator<?, 
?>>emptyMap());
-               
+                       new MockEnvironment("no", 4 * 
MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+                       Collections.<String, Accumulator<?, ?>>emptyMap());
+
                this.numberOfParallelSubtasks = numberOfParallelSubtasks;
                this.indexOfThisSubtask = indexOfThisSubtask;
                this.execConfig = execConfig;
-               this.checkpointLock = checkpointLock;
-               this.timerService = timerService;
+               this.timerService = DefaultTimeServiceProvider.
+                       
createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
        }
 
        @Override
@@ -216,20 +206,7 @@ public class MockRuntimeContext extends 
StreamingRuntimeContext {
        @Override
        public ScheduledFuture<?> registerTimer(final long time, final 
Triggerable target) {
                Preconditions.checkNotNull(timerService, "The processing time 
timer has not been initialized.");
-               
-               return timerService.registerTimer(time, new Runnable() {
-                       @Override
-                       public void run() {
-                               synchronized (checkpointLock) {
-                                       try {
-                                               target.trigger(time);
-                                       } catch (Throwable t) {
-                                               System.err.println("!!! Caught 
exception while processing timer. !!!");
-                                               t.printStackTrace();
-                                       }
-                               }
-                       }
-               });
+               return timerService.registerTimer(time, target);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
new file mode 100644
index 0000000..85a4115
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * An interface marking a task as capable to register exceptions thrown by 
different
+ * threads, other than the one executing the taks itself.
+ */
+public interface AsyncExceptionHandler {
+
+       /**
+        * Registers to the main thread an exception that was thrown by another 
thread
+        * (e.g. a TriggerTask), other than the one executing the main task.
+        */
+       void registerAsyncException(String message, AsynchronousException 
exception);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index b803b82..c7339b3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,6 +17,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -28,15 +31,26 @@ import java.util.concurrent.TimeUnit;
  */
 public class DefaultTimeServiceProvider extends TimeServiceProvider {
 
+       /** The containing task that owns this time service provider. */
+       private final AsyncExceptionHandler task;
+
+       private final Object checkpointLock;
+
        /** The executor service that schedules and calls the triggers of this 
task*/
        private final ScheduledExecutorService timerService;
 
-       public static DefaultTimeServiceProvider create 
(ScheduledExecutorService executor) {
-               return new DefaultTimeServiceProvider(executor);
+       public static DefaultTimeServiceProvider create(AsyncExceptionHandler 
task,
+                                                                               
                        ScheduledExecutorService executor,
+                                                                               
                        Object checkpointLock) {
+               return new DefaultTimeServiceProvider(task, executor, 
checkpointLock);
        }
 
-       private DefaultTimeServiceProvider(ScheduledExecutorService 
threadPoolExecutor) {
+       private DefaultTimeServiceProvider(AsyncExceptionHandler task,
+                                                                       
ScheduledExecutorService threadPoolExecutor,
+                                                                       Object 
checkpointLock) {
+               this.task = task;
                this.timerService = threadPoolExecutor;
+               this.checkpointLock = checkpointLock;
        }
 
        @Override
@@ -45,16 +59,62 @@ public class DefaultTimeServiceProvider extends 
TimeServiceProvider {
        }
 
        @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Runnable 
target) {
+       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
                long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
-               return timerService.schedule(target, delay, 
TimeUnit.MILLISECONDS);
+               return timerService.schedule(new TriggerTask(task, 
checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
+       }
+
+       @Override
+       public boolean isTerminated() {
+               return timerService.isTerminated();
        }
 
        @Override
        public void shutdownService() throws Exception {
-               if (!timerService.isTerminated()) {
-                       StreamTask.LOG.info("Timer service is shutting down.");
-               }
                timerService.shutdownNow();
        }
+
+       /**
+        * Internal task that is invoked by the timer service and triggers the 
target.
+        */
+       private static final class TriggerTask implements Runnable {
+
+               private final Object lock;
+               private final Triggerable target;
+               private final long timestamp;
+               private final AsyncExceptionHandler task;
+
+               TriggerTask(AsyncExceptionHandler task, final Object lock, 
Triggerable target, long timestamp) {
+                       this.task = task;
+                       this.lock = lock;
+                       this.target = target;
+                       this.timestamp = timestamp;
+               }
+
+               @Override
+               public void run() {
+                       synchronized (lock) {
+                               try {
+                                       target.trigger(timestamp);
+                               } catch (Throwable t) {
+
+                                       if (task != null) {
+                                               // registers the exception with 
the calling task
+                                               // so that it can be logged and 
(later) detected
+                                               TimerException asyncException = 
new TimerException(t);
+                                               
task.registerAsyncException("Caught exception while processing timer.", 
asyncException);
+                                       } else {
+                                               // this is for when we are in 
testing mode and we
+                                               // want to have real processing 
time.
+                                               t.printStackTrace();
+                                       }
+                               }
+                       }
+               }
+       }
+
+       @VisibleForTesting
+       public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
+               return new DefaultTimeServiceProvider(null, executor, 
checkpointLock);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 49bbee7..80d51a6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,13 +110,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 @Internal
 public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
                extends AbstractInvokable
-               implements StatefulTask {
+               implements StatefulTask, AsyncExceptionHandler {
 
        /** The thread group that holds all trigger timer threads */
        public static final ThreadGroup TRIGGER_THREAD_GROUP = new 
ThreadGroup("Triggers");
        
        /** The logger used by the StreamTask and its subclasses */
-       protected static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamTask.class);
        
        // 
------------------------------------------------------------------------
        
@@ -207,7 +207,13 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                timerService = timeProvider;
        }
 
+       /**
+        * Returns the current processing time.
+        */
        public long getCurrentProcessingTime() {
+               if (timerService == null) {
+                       throw new IllegalStateException("The timer service has 
not been initialized.");
+               }
                return timerService.getCurrentProcessingTime();
        }
 
@@ -237,7 +243,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                // that timestamp are removed by user
                                executor.setRemoveOnCancelPolicy(true);
 
-                               timerService = 
DefaultTimeServiceProvider.create(executor);
+                               timerService = 
DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
                        }
 
                        headOperator = 
configuration.getStreamOperator(getUserCodeClassLoader());
@@ -319,7 +325,10 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                        // stop all timers and threads
                        if (timerService != null) {
                                try {
-                                       timerService.shutdownService();
+                                       if (!timerService.isTerminated()) {
+                                               LOG.info("Timer service is 
shutting down.");
+                                               timerService.shutdownService();
+                                       }
                                }
                                catch (Throwable t) {
                                        // catch and log the exception to not 
replace the original exception
@@ -475,7 +484,10 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        protected void finalize() throws Throwable {
                super.finalize();
                if (timerService != null) {
-                       timerService.shutdownService();
+                       if (!timerService.isTerminated()) {
+                               LOG.info("Timer service is shutting down.");
+                               timerService.shutdownService();
+                       }
                }
 
                closeAllClosables();
@@ -819,7 +831,7 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                if (timerService == null) {
                        throw new IllegalStateException("The timer service has 
not been initialized.");
                }
-               return timerService.registerTimer(timestamp, new 
TriggerTask(this, lock, target, timestamp));
+               return timerService.registerTimer(timestamp, target);
        }
 
        /**
@@ -836,6 +848,17 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                }
        }
 
+       @Override
+       public void registerAsyncException(String message, 
AsynchronousException exception) {
+               if (isRunning) {
+                       LOG.error(message, exception);
+               }
+
+               if (this.asyncException == null) {
+                       this.asyncException = exception;
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -863,42 +886,6 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
        }
 
        // 
------------------------------------------------------------------------
-
-       /**
-        * Internal task that is invoked by the timer service and triggers the 
target.
-        */
-       private static final class TriggerTask implements Runnable {
-
-               private final Object lock;
-               private final Triggerable target;
-               private final long timestamp;
-               private final StreamTask<?, ?> task;
-
-               TriggerTask(StreamTask<?, ?> task, final Object lock, 
Triggerable target, long timestamp) {
-                       this.task = task;
-                       this.lock = lock;
-                       this.target = target;
-                       this.timestamp = timestamp;
-               }
-
-               @Override
-               public void run() {
-                       synchronized (lock) {
-                               try {
-                                       target.trigger(timestamp);
-                               } catch (Throwable t) {
-                                       if (task.isRunning) {
-                                               LOG.error("Caught exception 
while processing timer.", t);
-                                       }
-                                       if (task.asyncException == null) {
-                                               task.asyncException = new 
TimerException(t);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
        
        private static class AsyncCheckpointRunnable implements Runnable, 
Closeable {
 
@@ -961,12 +948,10 @@ public abstract class StreamTask<OUT, Operator extends 
StreamOperator<OUT>>
                                }
                        }
                        catch (Exception e) {
-                               if (owner.isRunning()) {
-                                       LOG.error("Caught exception while 
materializing asynchronous checkpoints.", e);
-                               }
-                               if (owner.asyncException == null) {
-                                       owner.asyncException = new 
AsynchronousException(e);
-                               }
+
+                               // registers the exception and tries to fail 
the whole task
+                               AsynchronousException asyncException = new 
AsynchronousException(e);
+                               owner.registerAsyncException("Caught exception 
while materializing asynchronous checkpoints.", asyncException);
                        }
                        finally {
                                synchronized (cancelables) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index 2314deb..a21a2e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -17,11 +17,13 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -32,30 +34,34 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
 
        private long currentTime = 0;
 
-       private Map<Long, List<Runnable>> registeredTasks = new HashMap<>();
+       private boolean isTerminated = false;
+
+       // sorts the timers by timestamp so that they are processed in the 
correct order.
+       private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
 
-       public void setCurrentTime(long timestamp) {
+       public void setCurrentTime(long timestamp) throws Exception {
                this.currentTime = timestamp;
 
                // decide which timers to fire and put them in a list
                // we do not fire them here to be able to accommodate timers
-               // that register other timers. The latter would through an 
exception.
+               // that register other timers.
 
-               Iterator<Map.Entry<Long, List<Runnable>>> it = 
registeredTasks.entrySet().iterator();
-               List<Runnable> toRun = new ArrayList<>();
+               Iterator<Map.Entry<Long, List<Triggerable>>> it = 
registeredTasks.entrySet().iterator();
+               List<Map.Entry<Long, List<Triggerable>>> toRun = new 
ArrayList<>();
                while (it.hasNext()) {
-                       Map.Entry<Long, List<Runnable>> t = it.next();
+                       Map.Entry<Long, List<Triggerable>> t = it.next();
                        if (t.getKey() <= this.currentTime) {
-                               for (Runnable r: t.getValue()) {
-                                       toRun.add(r);
-                               }
+                               toRun.add(t);
                                it.remove();
                        }
                }
 
                // now do the actual firing.
-               for (Runnable r: toRun) {
-                       r.run();
+               for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+                       long now = tasks.getKey();
+                       for (Triggerable task: tasks.getValue()) {
+                               task.trigger(now);
+                       }
                }
        }
 
@@ -65,8 +71,8 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
        }
 
        @Override
-       public ScheduledFuture<?> registerTimer(long timestamp, Runnable 
target) {
-               List<Runnable> tasks = registeredTasks.get(timestamp);
+       public ScheduledFuture<?> registerTimer(long timestamp, Triggerable 
target) {
+               List<Triggerable> tasks = registeredTasks.get(timestamp);
                if (tasks == null) {
                        tasks = new ArrayList<>();
                        registeredTasks.put(timestamp, tasks);
@@ -75,9 +81,14 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
                return null;
        }
 
+       @Override
+       public boolean isTerminated() {
+               return isTerminated;
+       }
+
        public int getNoOfRegisteredTimers() {
                int count = 0;
-               for (List<Runnable> tasks: registeredTasks.values()) {
+               for (List<Triggerable> tasks: registeredTasks.values()) {
                        count += tasks.size();
                }
                return count;
@@ -85,7 +96,6 @@ public class TestTimeServiceProvider extends 
TimeServiceProvider {
 
        @Override
        public void shutdownService() throws Exception {
-               this.registeredTasks.clear();
-               this.registeredTasks = null;
+               this.isTerminated = true;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index f3e4f78..42a4fa4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -34,7 +35,10 @@ public abstract class TimeServiceProvider {
         *                                              the task to be executed
         * @return the result to be returned.
         */
-       public abstract ScheduledFuture<?> registerTimer(final long timestamp, 
final Runnable target);
+       public abstract ScheduledFuture<?> registerTimer(final long timestamp, 
final Triggerable target);
+
+       /** Returns <tt>true</tt> if the service has been shut down, 
<tt>false</tt> otherwise. */
+       public abstract boolean isTerminated();
 
        /** Shuts down and clean up the timer service provider. */
        public abstract void shutdownService() throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 9c06b49..d61fec9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -184,6 +184,8 @@ public class StreamSourceOperatorTest {
 
                long watermarkInterval = 10;
                TestTimeServiceProvider timeProvider = new 
TestTimeServiceProvider();
+               timeProvider.setCurrentTime(0);
+
                setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, timeProvider);
 
                final List<StreamElement> output = new ArrayList<>();
@@ -249,17 +251,7 @@ public class StreamSourceOperatorTest {
                                        throw new RuntimeException("The time 
provider is null");
                                }
 
-                               timeProvider.registerTimer(execTime, new 
Runnable() {
-
-                                       @Override
-                                       public void run() {
-                                               try {
-                                                       
target.trigger(execTime);
-                                               } catch (Exception e) {
-                                                       e.printStackTrace();
-                                               }
-                                       }
-                               });
+                               timeProvider.registerTimer(execTime, target);
                                return null;
                        }
                }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index c9f204d..b9435f5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -49,58 +48,6 @@ import static org.junit.Assert.*;
 public class StreamTaskTimerTest {
 
        @Test
-       public void testCustomTimeServiceProvider() throws Throwable {
-               TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
-               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
-               mapTask.setTimeService(tp);
-
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
-                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               StreamConfig streamConfig = testHarness.getStreamConfig();
-
-               StreamMap<String, String> mapOperator = new StreamMap<>(new 
DummyMapFunction<String>());
-               streamConfig.setStreamOperator(mapOperator);
-
-               testHarness.invoke();
-
-               assertTrue(testHarness.getCurrentProcessingTime() == 0);
-
-               tp.setCurrentTime(11);
-               assertTrue(testHarness.getCurrentProcessingTime() == 11);
-
-               tp.setCurrentTime(15);
-               tp.setCurrentTime(16);
-               assertTrue(testHarness.getCurrentProcessingTime() == 16);
-               
-               // register 2 tasks
-               mapTask.registerTimer(30, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               mapTask.registerTimer(40, new Triggerable() {
-                       @Override
-                       public void trigger(long timestamp) {
-
-                       }
-               });
-
-               assertEquals(2, tp.getNoOfRegisteredTimers());
-
-               tp.setCurrentTime(35);
-               assertEquals(1, tp.getNoOfRegisteredTimers());
-
-               tp.setCurrentTime(40);
-               assertEquals(0, tp.getNoOfRegisteredTimers());
-
-               tp.shutdownService();
-       }
-
-       @Test
        public void testOpenCloseAndTimestamps() throws Exception {
                final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
                

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
new file mode 100644
index 0000000..4d4f07b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.testutils.OneShotLatch;
+import 
org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+       @Test
+       public void testDefaultTimeProvider() throws InterruptedException {
+               final OneShotLatch latch = new OneShotLatch();
+
+               final Object lock = new Object();
+               TimeServiceProvider timeServiceProvider = 
DefaultTimeServiceProvider
+                       
.createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+               final List<Long> timestamps = new ArrayList<>();
+
+               long start = System.currentTimeMillis();
+               long interval = 50L;
+
+               final long noOfTimers = 20;
+
+               // we add 2 timers per iteration minus the first that would 
have a negative timestamp
+               final long expectedNoOfTimers = 2 * noOfTimers - 1;
+
+               for (int i = 0; i < noOfTimers; i++) {
+                       double nextTimer = start + i * interval;
+
+                       timeServiceProvider.registerTimer((long) nextTimer, new 
Triggerable() {
+                               @Override
+                               public void trigger(long timestamp) throws 
Exception {
+                                       timestamps.add(timestamp);
+                                       if (timestamps.size() == 
expectedNoOfTimers) {
+                                               latch.trigger();
+                                       }
+                               }
+                       });
+
+                       // add also out-of-order tasks to verify that eventually
+                       // they will be executed in the correct order.
+
+                       if (i > 0) {
+                               timeServiceProvider.registerTimer((long) 
(nextTimer - 10), new Triggerable() {
+                                       @Override
+                                       public void trigger(long timestamp) 
throws Exception {
+                                               timestamps.add(timestamp);
+                                               if (timestamps.size() == 
expectedNoOfTimers) {
+                                                       latch.trigger();
+                                               }
+                                       }
+                               });
+                       }
+               }
+
+               if (!latch.isTriggered()) {
+                       latch.await();
+               }
+
+               Assert.assertEquals(timestamps.size(), expectedNoOfTimers);
+
+               // verify that the tasks are executed
+               // in ascending timestamp order
+
+               int counter = 0;
+               long lastTs = Long.MIN_VALUE;
+               for (long timestamp: timestamps) {
+                       Assert.assertTrue(timestamp >= lastTs);
+                       lastTs = timestamp;
+
+                       long expectedTs = start + (counter/2) * interval;
+                       Assert.assertEquals(timestamp, (expectedTs + ((counter 
% 2 == 0) ? 0 : 40)));
+                       counter++;
+               }
+       }
+
+       @Test
+       public void testTimerSorting() throws Exception {
+
+               final List<Long> result = new ArrayList<>();
+
+               TestTimeServiceProvider provider = new 
TestTimeServiceProvider();
+
+               provider.registerTimer(45, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+                               result.add(timestamp);
+                       }
+               });
+
+               provider.registerTimer(50, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+                               result.add(timestamp);
+                       }
+               });
+
+               provider.registerTimer(30, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+                               result.add(timestamp);
+                       }
+               });
+
+               provider.registerTimer(50, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+                               result.add(timestamp);
+                       }
+               });
+
+               Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4);
+
+               provider.setCurrentTime(100);
+               long seen = 0;
+               for (Long l: result) {
+                       Assert.assertTrue(l >= seen);
+                       seen = l;
+               }
+       }
+
+       @Test
+       public void testCustomTimeServiceProvider() throws Throwable {
+               TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+               final OneInputStreamTask<String, String> mapTask = new 
OneInputStreamTask<>();
+               mapTask.setTimeService(tp);
+
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                       mapTask, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
StreamTaskTimerTest.DummyMapFunction<String>());
+               streamConfig.setStreamOperator(mapOperator);
+
+               testHarness.invoke();
+
+               assertTrue(testHarness.getCurrentProcessingTime() == 0);
+
+               tp.setCurrentTime(11);
+               assertTrue(testHarness.getCurrentProcessingTime() == 11);
+
+               tp.setCurrentTime(15);
+               tp.setCurrentTime(16);
+               assertTrue(testHarness.getCurrentProcessingTime() == 16);
+
+               // register 2 tasks
+               mapTask.registerTimer(30, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               mapTask.registerTimer(40, new Triggerable() {
+                       @Override
+                       public void trigger(long timestamp) {
+
+                       }
+               });
+
+               assertEquals(2, tp.getNoOfRegisteredTimers());
+
+               tp.setCurrentTime(35);
+               assertEquals(1, tp.getNoOfRegisteredTimers());
+
+               tp.setCurrentTime(40);
+               assertEquals(0, tp.getNoOfRegisteredTimers());
+
+               tp.shutdownService();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 40a6c79..9849bd7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -493,7 +493,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        for (int i = 0; i < 300; i++) {
                                testHarness.processElement(new StreamRecord<>(i 
+ numElementsFirst));
                        }
-                       
+
                        op.dispose();
                        
                        // re-create the operator and restore the state
@@ -502,9 +502,8 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                                        windowSize, windowSize);
 
-                       testHarness =
-                                       new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
-
+                       timerService = new TestTimeServiceProvider();
+                       testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
                        testHarness.restore(state);
@@ -580,15 +579,16 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                        for (int i = numElementsFirst; i < numElements; i++) {
                                testHarness.processElement(new 
StreamRecord<>(i));
                        }
-                       
+
                        op.dispose();
-                       
+
                        // re-create the operator and restore the state
                        op = new AccumulatingProcessingTimeWindowOperator<>(
                                        validatingIdentityFunction, 
identitySelector,
                                        IntSerializer.INSTANCE, 
IntSerializer.INSTANCE,
                                        windowSize, windowSlide);
 
+                       timerService = new TestTimeServiceProvider();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 59bfe6f..3dfa395 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -601,6 +601,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSize);
 
+                       timerService = new TestTimeServiceProvider();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();
@@ -692,6 +693,7 @@ public class 
AggregatingAlignedProcessingTimeWindowOperatorTest {
                                        IntSerializer.INSTANCE, tupleSerializer,
                                        windowSize, windowSlide);
 
+                       timerService = new TestTimeServiceProvider();
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
 
                        testHarness.setup();

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index cb10c5c..ce634f0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -115,7 +115,7 @@ public class StreamTaskTestHarness<OUT> {
 
        public long getCurrentProcessingTime() {
                if (!(task instanceof StreamTask)) {
-                       System.currentTimeMillis();
+                       throw new 
UnsupportedOperationException("getCurrentProcessingTime() only supported on 
StreamTasks.");
                }
                return ((StreamTask) task).getCurrentProcessingTime();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 15074a7..6c637bf 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -37,8 +37,10 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -87,7 +89,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
         */
        private boolean setupCalled = false;
 
-
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) {
                this(operator, new ExecutionConfig());
        }
@@ -95,27 +96,35 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig) {
-               this(operator, executionConfig, 
DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
+               this(operator, executionConfig, null);
+       }
+
+       public OneInputStreamOperatorTestHarness(
+                       OneInputStreamOperator<IN, OUT> operator,
+                       ExecutionConfig executionConfig,
+                       TimeServiceProvider testTimeProvider) {
+               this(operator, executionConfig, new Object(), testTimeProvider);
        }
 
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
+                       Object checkpointLock,
                        TimeServiceProvider testTimeProvider) {
+
                this.operator = operator;
                this.outputList = new ConcurrentLinkedQueue<Object>();
                Configuration underlyingConfig = new Configuration();
                this.config = new StreamConfig(underlyingConfig);
                this.config.setCheckpointingEnabled(true);
                this.executionConfig = executionConfig;
-               this.checkpointLock = new Object();
+               this.checkpointLock = checkpointLock;
 
                final Environment env = new MockEnvironment("MockTwoInputTask", 
3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, 
executionConfig, MAX_PARALLELISM, 1, 0);
                mockTask = mock(StreamTask.class);
-               timeServiceProvider = testTimeProvider;
 
                when(mockTask.getName()).thenReturn("Mock Task");
-               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+               
when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
                when(mockTask.getEnvironment()).thenReturn(env);
@@ -125,21 +134,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                doAnswer(new Answer<Void>() {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               final long execTime = (Long) 
invocation.getArguments()[0];
-                               final Triggerable target = (Triggerable) 
invocation.getArguments()[1];
-
-                               timeServiceProvider.registerTimer(
-                                               execTime, new 
TriggerTask(checkpointLock, target, execTime));
+                               // do nothing
                                return null;
                        }
-               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
-
-               doAnswer(new Answer<Long>() {
-                       @Override
-                       public Long answer(InvocationOnMock invocation) throws 
Throwable {
-                               return 
timeServiceProvider.getCurrentProcessingTime();
-                       }
-               }).when(mockTask).getCurrentProcessingTime();
+               }).when(mockTask).registerAsyncException(any(String.class), 
any(AsynchronousException.class));
 
                try {
                        doAnswer(new Answer<CheckpointStreamFactory>() {
@@ -154,6 +152,26 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                        throw new RuntimeException(e.getMessage(), e);
                }
 
+               timeServiceProvider = testTimeProvider != null ? 
testTimeProvider :
+                       DefaultTimeServiceProvider.create(mockTask, 
Executors.newSingleThreadScheduledExecutor(), this.checkpointLock);
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               final long execTime = (Long) 
invocation.getArguments()[0];
+                               final Triggerable target = (Triggerable) 
invocation.getArguments()[1];
+
+                               timeServiceProvider.registerTimer(execTime, 
target);
+                               return null;
+                       }
+               }).when(mockTask).registerTimer(anyLong(), 
any(Triggerable.class));
+
+               doAnswer(new Answer<Long>() {
+                       @Override
+                       public Long answer(InvocationOnMock invocation) throws 
Throwable {
+                               return 
timeServiceProvider.getCurrentProcessingTime();
+                       }
+               }).when(mockTask).getCurrentProcessingTime();
        }
 
        public void setStateBackend(AbstractStateBackend stateBackend) {
@@ -216,7 +234,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                operator.notifyOfCompletedCheckpoint(checkpointId);
        }
 
-
        /**
         * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)}
 ()}
         */
@@ -275,32 +292,4 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                        // ignore
                }
        }
-
-       private static final class TriggerTask implements Runnable {
-
-               private final Object lock;
-               private final Triggerable target;
-               private final long timestamp;
-
-               TriggerTask(final Object lock, Triggerable target, long 
timestamp) {
-                       this.lock = lock;
-                       this.target = target;
-                       this.timestamp = timestamp;
-               }
-
-               @Override
-               public void run() {
-                       synchronized (lock) {
-                               try {
-                                       target.trigger(timestamp);
-                               } catch (Throwable t) {
-                                       try {
-                                               throw t;
-                                       } catch (Exception e) {
-                                               e.printStackTrace();
-                                       }
-                               }
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index af1f3fa..d47136c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;

Reply via email to