[
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16250747#comment-16250747
]
Sihua Zhou commented on FLINK-7873:
-----------------------------------
[~srichter] Thanks for your reply and the introduction of your approach to
implement this, I also have read the
PR:[4745|https://github.com/apache/flink/pull/4745 ] you linked before, the
unifying of state creation is pretty cool, it make the Flink's state much more
clearer. Regarding to the approach to implement local recovery, I still have
some question, just to share my points:
1. I don't think that `checkpoint id part of TDD` and the `check for key
groups` are needed for local recovery, cause CheckpointCoordinator have use the
checkpoint id to choose the right state handles for task to recover and also it
has done the key groups checking. If we can use the state handle to find it's
local data on TM then local recovery can be
implemented. So I think if we can identify the state handle by state handle id
then everything are done and this will work not only for keyed state but for
all state (managed state & raw state).
2. I do agree that use notifiedCheckpointComplete-messages to manage the local
data, but I afraid that clean up for local state may not be transparent to JM,
cause that JM not only need to create a signal to TM for from task failure and
from user commands but also need to create a signal for TM to clear the local
data when a stateful task deployed on a diff TM when recover from a failover,
And there are also other cases need to be considered like JM HA situation. So I
suggest that TM manages it's local data itself, this will make the true
transparent for JM, we can introduce a lease for every LocalCheckpointStore and
LocalCheckpointStore's associate with JobID, once the job failed, it lost the
lease and we will clear it if it not get a lease again (if the task recover on
the same TM then it will get the lease again, time interval can be very short).
This way TM can manage local data itself.
> Introduce CheckpointCacheManager for reading checkpoint data locally when
> performing failover
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big
> (e.g. 1T). What's worse, if this job performs recover again and again, it can
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed
> that we can cache the checkpoint data locally, and read checkpoint data from
> local cache as well as we can, we read the data from remote only if we fail
> locally. The advantage is that if a execution is assigned to the same
> TaskManager as before, it can save a lot of bandwith, and obtain a faster
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for
> it. In this way, all jobs is done by TaskManager, it transparent to
> JobManager. The only problem is that we may dispose a entry that maybe
> useful, in this case, we have to read from remote data finally, but users can
> avoid this by set a proper TTL value according to checkpoint interval and
> other things.
> Can someone give me some advice? I would appreciate it very much~
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)