[
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119353#comment-17119353
]
Yun Tang commented on FLINK-10954:
----------------------------------
This ticket has been inactive for more than one year as I just walk around and
the local-recovery is not enabled by default in Flink.
However, as we plan to make local recovery by default at FLINK-15507, this
problem should be better fixed.
When I looked back at this problem, the root cause is that current
{{LocalRecoveryDirectoryProviderImpl}} lack of any check on the provided
{{allocationBaseDirs}}.
If we compare the behavior of how rocksDB state-backend treat the
{{localRocksDbDirectories}} from configuration of
{{state.backend.rocksdb.localdir}} or default spilling directories from
IOManager , and we can see the difference that RocksDB would check whether
those {{localRocksDbDirectories}} are valid. However, RocksDB state-backend
would not execute any check on the {{allocationBaseDirs}} and not provide
feedback to {{LocalRecoveryDirectoryProviderImpl}} so that some invalid
{{allocationBaseDirs}} could be ignored then.
As state-backend is the consumer of {{LocalRecoveryDirectoryProvider}}, and
{{subtaskSpecificCheckpointDirectory}} is the actual interface used by
state-backends. I suggest to add interface likeĀ '{{void
confirmAllocationBaseDirsAvailable(Collection<Integer> indexes)'}} to tell
{{LocalRecoveryDirectoryProvider}} which {{allocationBaseDirs}} are valid. And
the {{subtaskSpecificCheckpointDirectory}} would only choose from valid
directories to report.
If we want to allow the {{confirmAllocationBaseDirsAvailable}} could give
different responses each time the subtask is submitted, we need to make
{{LocalRecoveryDirectoryProviderImpl}} not stateless anymore to remember
previous checkpointId sub directory stays. Current
{{LocalRecoveryDirectoryProviderImpl}} is actually stateless, and it would only
return one of the {{allocationBaseDirs}} based on the {{checkpointId %
allocationBaseDirsCount}}. I think this could be possible by introducing a map
of {{<checkpointId -> base dir index>}} and the same checkpoint id should only
be called twice: once to create, once to discard so that we could remove the
record on the second time.
On the other hand, we could also still make
{{LocalRecoveryDirectoryProviderImpl}} as stateless and only allow
{{confirmAllocationBaseDirsAvailable}} to initialize just once, and tasks
submitted later need to obey the first initialized result.
What do you think? [~sewen], [~liyu]
> Hardlink from files of previous local stored state might cross devices
> ----------------------------------------------------------------------
>
> Key: FLINK-10954
> URL: https://issues.apache.org/jira/browse/FLINK-10954
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.6.2
> Reporter: Yun Tang
> Assignee: Yun Tang
> Priority: Major
> Fix For: 1.11.0
>
>
> Currently, local recovery's base directories is initialized from
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}',
> which might consist of directories from different devices, such as
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB
> is initialized from IOManager's spillingDirectories, which might located in
> different device from local recovery's folder. However, hard-link between
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)