[ 
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249823#comment-16249823
 ] 

Stefan Richter commented on FLINK-7873:
---------------------------------------

[~sihuazhou] We are already planning to introduce this feature of local 
recovery in Flink 1.5 (~January 2018) and I have already some code for an 
implementation. There is currently not a JIRA (yet) because we only recently 
decided to first go for a simpler approach in Flink 1.5 before implementing a 
more involved version of local recovery in a later release, after some general 
refactoring in the checkpointing architecture that will make support of such 
features easier to integrate. I can share how our current planning for the 
simple approach roughly looks like:

1) Make recovery checkpoint id part of TDD (task deployment descriptor).
This first change is crucial so that we can identify the correct local state 
from which the job should recover. Currently, the TDD contains only state 
handles from which the backend recovers, but for local recovery, this is 
inusfficient because we can not figure out the local state from some remote 
state handles. We also need to check that key groups did not change, but they 
are already part of the TDD. This is already part of an open PR, see next point.

2) Refactoring that make all state backend creation explicit. (Strictly 
speaking, this point is optional for the simple approach, but still good to 
have)
Currently, creation of state related objects like keyed state backend, operator 
state backend, timer service, etc. is very inconsistent and unorganized. This 
makes future enhancements like local recovery harder. We should introduce a 
clearly defined factory step, in which all state related objects for each 
operator are created. This also removes legacy limitations like that only head 
operators can have a keyed state backend. Open PR #4745, that still needs 
review and some rebase.

3) Local Recovery MVP

This initial version of local recovery will work for RocksDB incremental and 
full checkpoints, and heap full checkpoints. One main advantage of the 
suggested approach is that the added features are almost exclusively 
implemented in the backends and transparent to other components like the 
checkpoint coordinator. We can easily replace this later with a more advanced 
approach, without hurting backwards compatibility. On the downside, the 
approach also lacks deeper integration with the scheduling, and simply assumes 
that task assignment in case of recovery is mostly identical to the previous 
scheduling.

* For full checkpoints, the main idea is to have a duplicating output stream, 
that wraps the existing DFS output stream and a (newly introduced) local disk 
output stream and duplicats writes to both streams. The state handle from the 
dfs stream is reported as before. The state that was written to the local 
output stream is managed inside the backend and associated with a checkpoint id.

* For RocksDB incremental checkpoint, we just need to change the algorithm so 
that the local snapshot directory is not deleted after uploading the files to 
DFS. We need to remember the directory and associate it with the checkpoint id. 
All management is again transparent, only inside the backend.

* Recovery can mostly use existing code, potentially with some minor 
modifications (e.g. no need to copy individual files from DFS when we recover 
incremental checkpoints).

* Cleanup for all checkpoints is controlled by 
notifiedCheckpointComplete-messages. Whenever a new checkpoint is known as 
completed, all previous checkpoints are deleted. We also need to create a 
signal from the JM to TMs to differentiate between cancelation from task 
failure and from user commands. In the second case, local checkpoints should be 
removed. It could also make sense to check for orphaned directories in the 
local checkpoint directory for new backends.


> Introduce CheckpointCacheManager for reading checkpoint data locally when 
> performing failover
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7873
>                 URL: https://issues.apache.org/jira/browse/FLINK-7873
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.3.2
>            Reporter: Sihua Zhou
>            Assignee: Sihua Zhou
>
> Why i introduce this:
>     Current recover strategy will always read checkpoint data from remote 
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big 
> (e.g. 1T). What's worse, if this job performs recover again and again, it can 
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed 
> that we can cache the checkpoint data locally, and read checkpoint data from 
> local cache as well as we can, we read the data from remote only if we fail 
> locally. The advantage is that if a execution is assigned to the same 
> TaskManager as before, it can save a lot of bandwith, and obtain a faster 
> recover.
> Solution:
>     TaskManager do the cache job and manage the cached data itself. It simple 
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if 
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for 
> it. In this way, all jobs is done by TaskManager, it transparent to 
> JobManager. The only problem is that we may dispose a entry that maybe 
> useful, in this case, we have to read from remote data finally, but users can 
> avoid this by set a proper TTL value according to checkpoint interval and 
> other things.
> Can someone give me some advice? I would appreciate it very much~



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to