[
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sihua Zhou updated FLINK-7873:
------------------------------
Summary: Introduce CheckpointCacheManager for reading checkpoint data
locally when performing failover (was: Introduce HybridStreamHandle to
optimize the recovery mechanism and try to read the checkpoint data locally)
> Introduce CheckpointCacheManager for reading checkpoint data locally when
> performing failover
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
>
> Current recover strategy will always read checkpoint data from remote
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big
> (e.g. 1T). What's worse, if this job performs recover again and again, it can
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed
> that we can cache the checkpoint data locally, and read checkpoint data from
> local cache as well as we can, we read the data from remote only if we fail
> locally. The advantage is that if a execution is assigned to the same
> TaskManager as before, it can save a lot of bandwidth, and obtain a faster
> recover.
> Key problems:
> 1. Cache the checkpoint data on local disk and manage it's create and delete.
> 2. introduce a HybridStreamStateHandler which try to create a local input
> stream first, if failed, it then create a remote input stream, it prototype
> looks like below:
> {code:java}
> class HybridStreamHandle {
> private StreamStateHandle localHandle;
> private StreamStateHandle remoteHandle;
> ......
> public FSDataInputStream openInputStream() throws IOException {
> FSDataInputStream inputStream = localHandle.openInputStream();
> return inputStream != null ? inputStream :
> remoteHandle.openInputStream();
> }
> .....
> }
> {code}
> Solution:
> There are two kind solutions I can think of.
> solution1:
> Backend do the cached job, and the HybridStreamHandle point to both
> local and remote data, HybridStreamHandle is managed by CheckpointCoordinator
> as well as other StreamHandle, so CheckpointCoordinator will perform dispose
> on it. when HybridStreamHandle performs dispose it call localHandle.dispose()
> and remoteHandle.dispose(). In this way, we have to record TaskManager's info
> (like location) in localHandle and add an entry in TaskManager to handle
> localHandle dispose message, we also have to consider the HA situation.
> solution2:
> TaskManager do the cached job and manage the cached data itself. It
> simple use a TTL-like method to manage handle's dispose, we dispose a handle
> if it wasn't be touched for a X time. We will touch the handles when we
> recover from checkpoint or when we performs a checkpoint, once we touch a
> handle we reset the TTL for it. In this way, all jobs is done by Backend, it
> transparent to JobManager. The only problem is that we may dispose a handle
> that maybe useful, but even in this case, we can read from remote data
> finally, and users can avoid this by set a proper TTL value according to
> checkpoint interval and other things.
> Consider trying not to complicate the problem reasons, i prefer to use the
> solution2. Can someone give me some advice? I would appreciate it very much~
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)