[
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249823#comment-16249823
]
Stefan Richter commented on FLINK-7873:
---------------------------------------
[~sihuazhou] We are already planning to introduce this feature of local
recovery in Flink 1.5 (~January 2018) and I have already some code for an
implementation. There is currently not a JIRA (yet) because we only recently
decided to first go for a simpler approach in Flink 1.5 before implementing a
more involved version of local recovery in a later release, after some general
refactoring in the checkpointing architecture that will make support of such
features easier to integrate. I can share how our current planning for the
simple approach roughly looks like:
1) Make recovery checkpoint id part of TDD (task deployment descriptor).
This first change is crucial so that we can identify the correct local state
from which the job should recover. Currently, the TDD contains only state
handles from which the backend recovers, but for local recovery, this is
inusfficient because we can not figure out the local state from some remote
state handles. We also need to check that key groups did not change, but they
are already part of the TDD. This is already part of an open PR, see next point.
2) Refactoring that make all state backend creation explicit. (Strictly
speaking, this point is optional for the simple approach, but still good to
have)
Currently, creation of state related objects like keyed state backend, operator
state backend, timer service, etc. is very inconsistent and unorganized. This
makes future enhancements like local recovery harder. We should introduce a
clearly defined factory step, in which all state related objects for each
operator are created. This also removes legacy limitations like that only head
operators can have a keyed state backend. Open PR #4745, that still needs
review and some rebase.
3) Local Recovery MVP
This initial version of local recovery will work for RocksDB incremental and
full checkpoints, and heap full checkpoints. One main advantage of the
suggested approach is that the added features are almost exclusively
implemented in the backends and transparent to other components like the
checkpoint coordinator. We can easily replace this later with a more advanced
approach, without hurting backwards compatibility. On the downside, the
approach also lacks deeper integration with the scheduling, and simply assumes
that task assignment in case of recovery is mostly identical to the previous
scheduling.
* For full checkpoints, the main idea is to have a duplicating output stream,
that wraps the existing DFS output stream and a (newly introduced) local disk
output stream and duplicats writes to both streams. The state handle from the
dfs stream is reported as before. The state that was written to the local
output stream is managed inside the backend and associated with a checkpoint id.
* For RocksDB incremental checkpoint, we just need to change the algorithm so
that the local snapshot directory is not deleted after uploading the files to
DFS. We need to remember the directory and associate it with the checkpoint id.
All management is again transparent, only inside the backend.
* Recovery can mostly use existing code, potentially with some minor
modifications (e.g. no need to copy individual files from DFS when we recover
incremental checkpoints).
* Cleanup for all checkpoints is controlled by
notifiedCheckpointComplete-messages. Whenever a new checkpoint is known as
completed, all previous checkpoints are deleted. We also need to create a
signal from the JM to TMs to differentiate between cancelation from task
failure and from user commands. In the second case, local checkpoints should be
removed. It could also make sense to check for orphaned directories in the
local checkpoint directory for new backends.
> Introduce CheckpointCacheManager for reading checkpoint data locally when
> performing failover
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big
> (e.g. 1T). What's worse, if this job performs recover again and again, it can
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed
> that we can cache the checkpoint data locally, and read checkpoint data from
> local cache as well as we can, we read the data from remote only if we fail
> locally. The advantage is that if a execution is assigned to the same
> TaskManager as before, it can save a lot of bandwith, and obtain a faster
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for
> it. In this way, all jobs is done by TaskManager, it transparent to
> JobManager. The only problem is that we may dispose a entry that maybe
> useful, in this case, we have to read from remote data finally, but users can
> avoid this by set a proper TTL value according to checkpoint interval and
> other things.
> Can someone give me some advice? I would appreciate it very much~
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)