http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 247edd6..5275a39 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -92,7 +92,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, - TimeServiceProvider testTimeProvider, + TestTimeServiceProvider testTimeProvider, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) { super(operator, executionConfig, testTimeProvider);
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index d6f46fd..d8a0ee2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -31,15 +31,16 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -87,6 +88,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { */ private boolean setupCalled = false; + private volatile boolean wasFailedExternally = false; + public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) { this(operator, new ExecutionConfig()); } @@ -100,7 +103,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, - TimeServiceProvider testTimeProvider) { + TestTimeServiceProvider testTimeProvider) { this(operator, executionConfig, new Object(), testTimeProvider); } @@ -132,10 +135,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { - // do nothing + wasFailedExternally = true; return null; } - }).when(mockTask).registerAsyncException(any(AsynchronousException.class)); + }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class)); try { doAnswer(new Answer<CheckpointStreamFactory>() { @@ -161,6 +164,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { }).when(mockTask).getTimerService(); } + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.config.setTimeCharacteristic(timeCharacteristic); + } + + public TimeCharacteristic getTimeCharacteristic() { + return this.config.getTimeCharacteristic(); + } + + public boolean wasFailedExternally() { + return wasFailedExternally; + } + public void setStateBackend(AbstractStateBackend stateBackend) { this.stateBackend = stateBackend; }
