[
https://issues.apache.org/jira/browse/FLINK-9633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529627#comment-16529627
]
ASF GitHub Bot commented on FLINK-9633:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/6194#discussion_r199441371
--- Diff:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
---
@@ -842,6 +852,100 @@ public void
testSetsUserCodeClassLoaderForTimerThreadFactory() throws Throwable
}
}
+ @Test
+ public void
testTriggerSavepointWhenTheFileSystemIsDifferentWithCheckpoint() throws
Exception {
+
+ final long checkpointId = 42L;
+ final long timestamp = 1L;
+
+ Environment mockEnvironment = spy(new
MockEnvironmentBuilder().build());
+ StreamTask<?, ?> streamTask = new
EmptyStreamTask(mockEnvironment);
+
+ // mock the operators
+ StreamOperator<?> statelessOperator =
+ mock(StreamOperator.class);
+
+ final OperatorID operatorID = new OperatorID();
+ when(statelessOperator.getOperatorID()).thenReturn(operatorID);
+
+ // mock the returned empty snapshot result (all state handles
are null)
+ OperatorSnapshotFutures statelessOperatorSnapshotResult = new
OperatorSnapshotFutures();
+ when(statelessOperator.snapshotState(anyLong(), anyLong(),
any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
+ .thenReturn(statelessOperatorSnapshotResult);
+
+ // set up the task
+ StreamOperator<?>[] streamOperators = {statelessOperator};
+ OperatorChain<Void, AbstractStreamOperator<Void>> operatorChain
= mock(OperatorChain.class);
+
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
+
+ FileSystem checkpointFileSystem = mock(FileSystem.class);
+ FileSystem savepointFileSystem = mock(FileSystem.class);
+
+ FileSystem.setFsFactories(new HashMap<String,
FileSystemFactory>() {{
+ this.put("file", new FileSystemFactory() {
+
+ @Override
+ public String getScheme() {
+ return "file";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws
IOException {
+ return savepointFileSystem;
+ }
+ });
+ this.put("hdfs", new FileSystemFactory() {
+ @Override
+ public String getScheme() {
+ return "hdfs";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+
+ }
+
+ @Override
+ public FileSystem create(URI fsUri) throws
IOException {
+ return checkpointFileSystem;
+ }
+ });
+ }});
+
+ CheckpointStorage checkpointStorage = spy(new
FsCheckpointStorage(new Path("hdfs://test1/"), new Path("file:///test2/"), new
JobID(), 1024));
+
+ CheckpointStorageLocationReference locationReference =
AbstractFsCheckpointStorage.encodePathAsReference(new Path("file:///test2/"));
+
+
when(checkpointStorage.resolveCheckpointStorageLocation(checkpointId,
locationReference)).then(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
+ // valid
+ FsCheckpointStorageLocation
checkpointStorageLocation = (FsCheckpointStorageLocation)
invocationOnMock.callRealMethod();
+ assertEquals(savepointFileSystem,
checkpointStorageLocation.getFileSystem());
+ return checkpointStorageLocation;
+ }
+ });
+
+ Whitebox.setInternalState(streamTask, "isRunning", true);
+ Whitebox.setInternalState(streamTask, "lock", new Object());
+ Whitebox.setInternalState(streamTask, "operatorChain",
operatorChain);
+ Whitebox.setInternalState(streamTask, "cancelables", new
CloseableRegistry());
+ Whitebox.setInternalState(streamTask, "configuration", new
StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "checkpointStorage",
checkpointStorage);
+ Whitebox.setInternalState(streamTask,
"asyncOperationsThreadPool", Executors.newCachedThreadPool());
+
+ CheckpointMetaData checkpointMetaData = new
CheckpointMetaData(checkpointId, timestamp);
+
+ streamTask.triggerCheckpoint(
+ checkpointMetaData,
+ new CheckpointOptions(CheckpointType.SAVEPOINT,
locationReference));
+ }
--- End diff --
This tests includes a lot of mocking, spying and whitebox testing. Usually
these things are really hard to maintain. I would, therefore, suggest to create
a unit test for the `FsCheckpointStorage#resolveCheckpointStorageLocation`
method instead.
> Flink doesn't use the Savepoint path's filesystem to create the OuptutStream
> on Task.
> -------------------------------------------------------------------------------------
>
> Key: FLINK-9633
> URL: https://issues.apache.org/jira/browse/FLINK-9633
> Project: Flink
> Issue Type: Bug
> Components: State Backends, Checkpointing
> Affects Versions: 1.5.0
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Currently, flink use the Savepoint's filesystem to create the meta output
> stream in CheckpointCoordinator(JM side), but in StreamTask(TM side) it uses
> the Checkpoint's filesystem to create the checkpoint data output stream. When
> the Savepoint & Checkpoint in different filesystem this will lead to
> problematic.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)