Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6194#discussion_r199441371
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
---
@@ -842,6 +852,100 @@ public void
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
}
}
+ @Test
+ public void
testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws
Exception {
+
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ Environment mockEnvironment = spy(new
MockEnvironmentBuilder().build());
+ StreamTask<?, ?> streamTask = new
EmptyStreamTask(mockEnvironment);
+
+ // mock the operators
+ StreamOperator<?> statelessOperator =
+ mock(StreamOperator.class);
+
+ final OperatorID operatorID = new OperatorID();
+ when(statelessOperator.getOperatorID()).thenReturn(operatorID);
+
+ // mock the returned empty snapshot result (all state handles
are null)
+ OperatorSnapshotFutures statelessOperatorSnapshotResult = new
OperatorSnapshotFutures();
+ when(statelessOperator.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
+ .thenReturn(statelessOperatorSnapshotResult);
+
+ // set up the task
+ StreamOperator<?>[] streamOperators = {statelessOperator};
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain
= mock(OperatorChain.class);
+
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ FileSystem checkpointFileSystem = mock(FileSystem.class);
+ FileSystem savepointFileSystem = mock(FileSystem.class);
+
+ FileSystem.setFsFactories(new HashMap<String,
FileSystemFactory>() {{
+ this.put("file", new FileSystemFactory() {
+
+ @Override
+ public String getScheme() {
+ return "file";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws
IOException {
+ return savepointFileSystem;
+ }
+ });
+ this.put("hdfs", new FileSystemFactory() {
+ @Override
+ public String getScheme() {
+ return "hdfs";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws
IOException {
+ return checkpointFileSystem;
+ }
+ });
+ }});
+
+ CheckpointStorage checkpointStorage = spy(new
FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new
JobID(), 1024));
+
+ CheckpointStorageLocationReference locationReference =
AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/"));
+
+
when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId,
locationReference)).then(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ // valid
+ FsCheckpointStorageLocation
checkpointStorageLocation = (FsCheckpointStorageLocation)
invocationOnMock.callRealMethod();
+ assertEquals(savepointFileSystem,
checkpointStorageLocation.getFileSystem());
+ return checkpointStorageLocation;
+ }
+ });
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain",
operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new
CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new
StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "checkpointStorage",
checkpointStorage);
+ Whitebox.setInternalState(streamTask,
"asyncOperationsThreadPool", Executors.newCachedThreadPool());
+
+ CheckpointMetaData checkpointMetaData = new
CheckpointMetaData(checkpointId, timestamp);
+
+ streamTask.triggerCheckpoint(
+ checkpointMetaData,
+ new CheckpointOptions(CheckpointType.SAVEPOINT,
locationReference));
+ }
--- End diff --
This tests includes a lot of mocking, spying and whitebox testing. Usually
these things are really hard to maintain. I would, therefore, suggest to create
a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation`
method instead.
---