[ 
https://issues.apache.org/jira/browse/FLINK-19596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17216517#comment-17216517
 ] 

Zhijiang commented on FLINK-19596:
----------------------------------

Thanks for proposing this potential improvement [~wind_ljy]!

+1 for the motivation. 

If I understood correctly, the 
`ZooKeeperCompletedCheckpointStore#completedCheckpoints`  is not loaded (lazy 
loading) during class construction, and it would be updated in append way 
during follow up `CheckpointCoordinator#completePendingCheckpoint`. 

Once `CheckpointCoordinator#restoreLatestCheckpointedStateInternal, it lazy 
loads all the completed checkpoints via `CompletedCheckpointStore#recover`.  So 
it has some duplicate overhead here to read already completed checkpoint twice.

Regarding the solution, I am not quite clear whether we can couple this 
decision(CompletedCheckpointStore#recover) with global/local/regional failover. 
E.g. JM leader change will cause global failover now, but if we improve it 
future, it might not need job restart via reconciling.  

If we can refactor to make 
`completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery)` 
cover the internal logic of `completedCheckpointStore.recover()`, it might seem 
more make sense. I mean we only care about to get latest checkpoint via 
`#getLatestCheckpoint` interface, and in its internal implementations it can 
judge whether further need to call `#recover` to load previously completed 
checkpoint or not.

> Do not recover CompletedCheckpointStore on each failover
> --------------------------------------------------------
>
>                 Key: FLINK-19596
>                 URL: https://issues.apache.org/jira/browse/FLINK-19596
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.2
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {{completedCheckpointStore.recover()}} in 
> {{restoreLatestCheckpointedStateInternal}} could be a bottleneck on failover 
> because the {{CompletedCheckpointStore}} needs to load HDFS files to 
> instantialize the {{CompleteCheckpoint}} instances.
> The impact is significant in our case below:
> * Jobs with high parallelism (no shuffle) which transfer data from Kafka to 
> other filesystems.
> * If a machine goes down, several containers and tens of tasks are affected, 
> which means the {{completedCheckpointStore.recover()}} would be called tens 
> of times since the tasks are not in a failover region.
> And I notice there is a "TODO" in the source codes:
> {code:java}
> // Recover the checkpoints, TODO this could be done only when there is a new 
> leader, not on each recovery
> completedCheckpointStore.recover();
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to