[
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216581#comment-17216581
]
Roman Khachatryan commented on FLINK-19596:
-------------------------------------------
Hi,
I was thinking about the solution for a related issue FLINK-19401.
What I came up with is skipping loading checkpoints from FSĀ *after listing
them in ZK* inside ZooKeeperCompletedCheckpointStore.recover() - if they are
already loaded.
It seems [~wind_ljy] that your proposal is similar, but it also skips the
listing of checkpoints in ZK. Is that correct?
If so, wouldn't it allow restoring from a not the latest checkpoint? (e.g. in
case of losing-regaining leadership)?
> Do not recover CompletedCheckpointStore on each failover
> --------------------------------------------------------
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.11.2
> Reporter: Jiayi Liao
> Priority: Critical
> Fix For: 1.12.0
>
>
> {{completedCheckpointStore.recover()}} in
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover
> because the {{CompletedCheckpointStore}} needs to load HDFS files to
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected,
> which means the {{completedCheckpointStore.recover()}} would be called tens
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)