Repository: flink Updated Branches: refs/heads/master 14518067c -> bcd028d75
[hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcd028d7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcd028d7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcd028d7 Branch: refs/heads/master Commit: bcd028d75b0e5c5c691e24640a2196b2fdaf85e0 Parents: 7f42259 Author: Stefan Richter <[email protected]> Authored: Mon May 14 15:09:50 2018 +0200 Committer: Stefan Richter <[email protected]> Committed: Mon May 14 17:50:52 2018 +0200 ---------------------------------------------------------------------- .../streaming/api/operators/AbstractStreamOperatorTest.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bcd028d7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java index 904ff64..f0195a2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java @@ -495,7 +495,7 @@ public class AbstractStreamOperatorTest { final CloseableRegistry closeableRegistry = new CloseableRegistry(); - StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(0L, 0L)); whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); @@ -573,7 +573,7 @@ public class AbstractStreamOperatorTest { RunnableFuture<SnapshotResult<KeyedStateHandle>> futureKeyedStateHandle = mock(RunnableFuture.class); RunnableFuture<SnapshotResult<OperatorStateHandle>> futureOperatorStateHandle = mock(RunnableFuture.class); - StateSnapshotContextSynchronousImpl context = mock(StateSnapshotContextSynchronousImpl.class); + StateSnapshotContextSynchronousImpl context = spy(new StateSnapshotContextSynchronousImpl(checkpointId, timestamp)); when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle); when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle); @@ -582,7 +582,6 @@ public class AbstractStreamOperatorTest { whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context); whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult); - CheckpointStreamFactory streamFactory = mock(CheckpointStreamFactory.class); StreamTask<Void, AbstractStreamOperator<Void>> containingTask = mock(StreamTask.class); when(containingTask.getCancelables()).thenReturn(closeableRegistry); @@ -600,7 +599,7 @@ public class AbstractStreamOperatorTest { when(operatorStateBackend.snapshot( eq(checkpointId), eq(timestamp), - eq(streamFactory), + any(CheckpointStreamFactory.class), any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle); AbstractKeyedStateBackend<?> keyedStateBackend = mock(AbstractKeyedStateBackend.class);
