Repository: flink
Updated Branches:
  refs/heads/master 14518067c -> bcd028d75


[hotfix][tests] Fix minor mocking issues in AbstractStreamOperatorTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bcd028d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bcd028d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bcd028d7

Branch: refs/heads/master
Commit: bcd028d75b0e5c5c691e24640a2196b2fdaf85e0
Parents: 7f42259
Author: Stefan Richter <[email protected]>
Authored: Mon May 14 15:09:50 2018 +0200
Committer: Stefan Richter <[email protected]>
Committed: Mon May 14 17:50:52 2018 +0200

----------------------------------------------------------------------
 .../streaming/api/operators/AbstractStreamOperatorTest.java   | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bcd028d7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index 904ff64..f0195a2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -495,7 +495,7 @@ public class AbstractStreamOperatorTest {
 
                final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
-               StateSnapshotContextSynchronousImpl context = 
mock(StateSnapshotContextSynchronousImpl.class);
+               StateSnapshotContextSynchronousImpl context = spy(new 
StateSnapshotContextSynchronousImpl(0L, 0L));
 
                
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
 
@@ -573,7 +573,7 @@ public class AbstractStreamOperatorTest {
                RunnableFuture<SnapshotResult<KeyedStateHandle>> 
futureKeyedStateHandle = mock(RunnableFuture.class);
                RunnableFuture<SnapshotResult<OperatorStateHandle>> 
futureOperatorStateHandle = mock(RunnableFuture.class);
 
-               StateSnapshotContextSynchronousImpl context = 
mock(StateSnapshotContextSynchronousImpl.class);
+               StateSnapshotContextSynchronousImpl context = spy(new 
StateSnapshotContextSynchronousImpl(checkpointId, timestamp));
                
when(context.getKeyedStateStreamFuture()).thenReturn(futureKeyedStateHandle);
                
when(context.getOperatorStateStreamFuture()).thenReturn(futureOperatorStateHandle);
 
@@ -582,7 +582,6 @@ public class AbstractStreamOperatorTest {
                
whenNew(StateSnapshotContextSynchronousImpl.class).withAnyArguments().thenReturn(context);
                
whenNew(OperatorSnapshotFutures.class).withAnyArguments().thenReturn(operatorSnapshotResult);
 
-               CheckpointStreamFactory streamFactory = 
mock(CheckpointStreamFactory.class);
                StreamTask<Void, AbstractStreamOperator<Void>> containingTask = 
mock(StreamTask.class);
                
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
 
@@ -600,7 +599,7 @@ public class AbstractStreamOperatorTest {
                when(operatorStateBackend.snapshot(
                        eq(checkpointId),
                        eq(timestamp),
-                       eq(streamFactory),
+                       any(CheckpointStreamFactory.class),
                        
any(CheckpointOptions.class))).thenReturn(futureManagedOperatorStateHandle);
 
                AbstractKeyedStateBackend<?> keyedStateBackend = 
mock(AbstractKeyedStateBackend.class);

Reply via email to