http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 247edd6..5275a39 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -92,7 +92,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
 
        public 
KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
-                       TimeServiceProvider testTimeProvider,
+                       TestTimeServiceProvider testTimeProvider,
                        KeySelector<IN, K> keySelector,
                        TypeInformation<K> keyType) {
                super(operator, executionConfig, testTimeProvider);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index d6f46fd..d8a0ee2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -31,15 +31,16 @@ import 
org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -87,6 +88,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
         */
        private boolean setupCalled = false;
 
+       private volatile boolean wasFailedExternally = false;
+
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) {
                this(operator, new ExecutionConfig());
        }
@@ -100,7 +103,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig,
-                       TimeServiceProvider testTimeProvider) {
+                       TestTimeServiceProvider testTimeProvider) {
                this(operator, executionConfig, new Object(), testTimeProvider);
        }
 
@@ -132,10 +135,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                doAnswer(new Answer<Void>() {
                        @Override
                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               // do nothing
+                               wasFailedExternally = true;
                                return null;
                        }
-               
}).when(mockTask).registerAsyncException(any(AsynchronousException.class));
+               }).when(mockTask).handleAsyncException(any(String.class), 
any(Throwable.class));
 
                try {
                        doAnswer(new Answer<CheckpointStreamFactory>() {
@@ -161,6 +164,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                }).when(mockTask).getTimerService();
        }
 
+       public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
+               this.config.setTimeCharacteristic(timeCharacteristic);
+       }
+
+       public TimeCharacteristic getTimeCharacteristic() {
+               return this.config.getTimeCharacteristic();
+       }
+
+       public boolean wasFailedExternally() {
+               return wasFailedExternally;
+       }
+
        public void setStateBackend(AbstractStateBackend stateBackend) {
                this.stateBackend = stateBackend;
        }

Reply via email to