[
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216517#comment-17216517
]
Zhijiang commented on FLINK-19596:
----------------------------------
Thanks for proposing this potential improvement [~wind_ljy]!
+1 for the motivation.
If I understood correctly, the
`ZooKeeperCompletedCheckpointStore#completedCheckpoints` is not loaded (lazy
loading) during class construction, and it would be updated in append way
during follow up `CheckpointCoordinator#completePendingCheckpoint`.
Once `CheckpointCoordinator#restoreLatestCheckpointedStateInternal, it lazy
loads all the completed checkpoints via `CompletedCheckpointStore#recover`. So
it has some duplicate overhead here to read already completed checkpoint twice.
Regarding the solution, I am not quite clear whether we can couple this
decision(CompletedCheckpointStore#recover) with global/local/regional failover.
E.g. JM leader change will cause global failover now, but if we improve it
future, it might not need job restart via reconciling.
If we can refactor to make
`completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery)`
cover the internal logic of `completedCheckpointStore.recover()`, it might seem
more make sense. I mean we only care about to get latest checkpoint via
`#getLatestCheckpoint` interface, and in its internal implementations it can
judge whether further need to call `#recover` to load previously completed
checkpoint or not.
> Do not recover CompletedCheckpointStore on each failover
> --------------------------------------------------------
>
> Key: FLINK-19596
> URL: https://issues.apache.org/jira/browse/FLINK-19596
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.11.2
> Reporter: Jiayi Liao
> Priority: Major
>
> {{completedCheckpointStore.recover()}} in
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover
> because the {{CompletedCheckpointStore}} needs to load HDFS files to
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected,
> which means the {{completedCheckpointStore.recover()}} would be called tens
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)