[ 
https://issues.apache.org/jira/browse/FLINK-24086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408138#comment-17408138
 ] 

ming li commented on FLINK-24086:
---------------------------------

 

Hi, [~pnowojski], We implemented a new failover strategy (by discarding some 
data to only restart failed tasks). Here is one of the logs of our task 
recovery.
{code:java}
2021-09-01 19:20:35,533 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - There are 0 
checkpoints are on HDFS but not on Zookeeper.
2021-09-01 19:20:42,188 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - After restoring 
CompletedCheckpointStore, checkpoints [42224, 42225, 42226, 42227, 42228, 
42223], savepoints [].{code}
Between these logs, only CompletedCheckpoint registration with 
SharedStateRegistry occurred, which took about 6 seconds here. 
{code:java}

LOG.info("There are {} checkpoints are on HDFS but not on Zookeeper.", 
extraCheckpoints.size()); 

// Now, we re-register all (shared) states from the checkpoint store with the 
new registry 
for(CompletedCheckpoint completedCheckpoint : 
completedCheckpointStore.getAllCheckpoints()) {
     
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry); 
} 

LOG.info("After restoring CompletedCheckpointStore, checkpoints {}, savepoints 
{}.");
{code}
This job has 2048 tasks and will retain 6 {{Checkpoint}}. Each task has a state 
size of about 10G and has 100 to 200 {{SharedState}}. It needs about 6 * 150 * 
2048 = 1843200 cycles to register {{SharedRegistry}} during recovery.

I have also read the issue of FLINK-7268, and I think this is to avoid 
asynchronous deletion that causes incorrect reference counts of the 
SharedState. But now we don't restore {{CompleteCheckpointStore}} again, this 
problem will no longer exist, because we will not change the reference count of 
SharedState, and asynchronous deletion can still update the reference count of 
{{SharedState}} correctly.

 

> Do not re-register SharedStateRegistry to reduce the recovery time of the job
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-24086
>                 URL: https://issues.apache.org/jira/browse/FLINK-24086
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: ming li
>            Priority: Major
>
> At present, we only recover the {{CompletedCheckpointStore}} when the 
> {{JobManager}} starts, so it seems that we do not need to re-register the 
> {{SharedStateRegistry}} when the task restarts.
> The reason for this issue is that in our production environment, we discard 
> part of the data and state to only restart the failed task, but found that it 
> may take several seconds to register the {{SharedStateRegistry}} (thousands 
> of tasks and dozens of TB states). When there are a large number of task 
> failures at the same time, this may take several minutes (number of tasks * 
> several seconds).
> Therefore, if the {{SharedStateRegistry}} can be reused, the time for task 
> recovery can be reduced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to