[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-7873:
------------------------------
    Summary: Introduce CheckpointCacheManager for reading checkpoint data 
locally when performing failover  (was: Introduce HybridStreamHandle to 
optimize the recovery mechanism and try to read the checkpoint data locally)

> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwidth, and obtain a faster 
> recover.
> Key problems:
> 1. Cache the checkpoint data on local disk and manage it's create and delete.
> 2. introduce a HybridStreamStateHandler which try to create a local input 
> stream first, if failed, it then create a remote input stream, it prototype 
> looks like below:
> {code:java}
> class HybridStreamHandle {
>    private StreamStateHandle localHandle;
>    private StreamStateHandle remoteHandle;
>    ......
>    public FSDataInputStream openInputStream() throws IOException {
>         FSDataInputStream inputStream = localHandle.openInputStream();
>         return inputStream != null ? inputStream : 
> remoteHandle.openInputStream();
>     }
>    .....
> }
> {code}
> Solution:
>       There are two kind solutions I can think of.
> solution1:
>       Backend do the cached job, and the HybridStreamHandle point to both 
> local and remote data, HybridStreamHandle is managed by CheckpointCoordinator 
> as well as other StreamHandle, so CheckpointCoordinator will perform dispose 
> on it. when HybridStreamHandle performs dispose it call localHandle.dispose() 
> and remoteHandle.dispose(). In this way, we have to record TaskManager's info 
> (like location) in localHandle and add an entry in TaskManager to handle 
> localHandle dispose message, we also have to consider the HA situation.
> solution2:
>       TaskManager do the cached job and manage the cached data itself. It 
> simple use a TTL-like method to manage handle's dispose, we dispose a handle 
> if it wasn't be touched for a X time. We will touch the handles when we 
> recover from checkpoint or when we performs a checkpoint, once we touch a 
> handle we reset the TTL for it. In this way, all jobs is done by Backend, it 
> transparent to JobManager. The only problem is that we may dispose a handle 
> that maybe useful, but even in this case, we can read from remote data 
> finally, and users can avoid this by set a proper TTL value according to 
> checkpoint interval and other things.
> Consider trying not to complicate the problem reasons, i prefer to use the 
> solution2. Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to