[
https://issues.apache.org/jira/browse/FLINK-24086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408138#comment-17408138
]
ming li commented on FLINK-24086:
---------------------------------
Hi, [~pnowojski], We implemented a new failover strategy (by discarding some
data to only restart failed tasks). Here is one of the logs of our task
recovery.
{code:java}
2021-09-01 19:20:35,533 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - There are 0
checkpoints are on HDFS but not on Zookeeper.
2021-09-01 19:20:42,188 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - After restoring
CompletedCheckpointStore, checkpoints [42224, 42225, 42226, 42227, 42228,
42223], savepoints [].{code}
Between these logs, only CompletedCheckpoint registration with
SharedStateRegistry occurred, which took about 6 seconds here.
{code:java}
LOG.info("There are {} checkpoints are on HDFS but not on Zookeeper.",
extraCheckpoints.size());
// Now, we re-register all (shared) states from the checkpoint store with the
new registry
for(CompletedCheckpoint completedCheckpoint :
completedCheckpointStore.getAllCheckpoints()) {
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
LOG.info("After restoring CompletedCheckpointStore, checkpoints {}, savepoints
{}.");
{code}
This job has 2048 tasks and will retain 6 {{Checkpoint}}. Each task has a state
size of about 10G and has 100 to 200 {{SharedState}}. It needs about 6 * 150 *
2048 = 1843200 cycles to register {{SharedRegistry}} during recovery.
I have also read the issue of FLINK-7268, and I think this is to avoid
asynchronous deletion that causes incorrect reference counts of the
SharedState. But now we don't restore {{CompleteCheckpointStore}} again, this
problem will no longer exist, because we will not change the reference count of
SharedState, and asynchronous deletion can still update the reference count of
{{SharedState}} correctly.
> Do not re-register SharedStateRegistry to reduce the recovery time of the job
> -----------------------------------------------------------------------------
>
> Key: FLINK-24086
> URL: https://issues.apache.org/jira/browse/FLINK-24086
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Reporter: ming li
> Priority: Major
>
> At present, we only recover the {{CompletedCheckpointStore}} when the
> {{JobManager}} starts, so it seems that we do not need to re-register the
> {{SharedStateRegistry}} when the task restarts.
> The reason for this issue is that in our production environment, we discard
> part of the data and state to only restart the failed task, but found that it
> may take several seconds to register the {{SharedStateRegistry}} (thousands
> of tasks and dozens of TB states). When there are a large number of task
> failures at the same time, this may take several minutes (number of tasks *
> several seconds).
> Therefore, if the {{SharedStateRegistry}} can be reused, the time for task
> recovery can be reduced.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)