[ 
https://issues.apache.org/jira/browse/FLINK-16931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246552#comment-17246552
 ] 

Roman Khachatryan commented on FLINK-16931:
-------------------------------------------

I think the issue was resolved by FLINK-19401: when JM already has the same 
checkpoints in memory as in ZK it wouldn't download them from DFS. That is if 
it is not failing over or restoring from a savepoint.
  
 I tried to verify it locally with an artificial failure on CP handle 
retrieval. 

*Without* the fix, I see infinite load attempts failing at:
{code:java}
9642 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/70e47a445aa53ff9bdcba9c79f6a58fa'}.
9644 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/70e47a445aa53ff9bdcba9c79f6a58fa'}.
9644 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to fetch 1 checkpoints from storage.
9644 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to retrieve checkpoint 4.
9644 [flink-akka.actor.default-dispatcher-6] WARN  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Could 
not retrieve checkpoint, not adding to list of recovered checkpoints.
java.lang.RuntimeException: test
        at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:322)
 ~[classes/:?]
        at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:165)
 ~[classes/:?]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1374)
 ~[classes/:?]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToAll(CheckpointCoordinator.java:1321)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:380)
 ~[classes/:?]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$restartTasks$2(DefaultScheduler.java:291)
 ~[classes/:?]
{code}
 

*With* a fix, I see
{code:java}
5859 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/f6f58c166e273321b03789d1d5211855'}.
5864 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
ZooKeeperStateHandleStore{namespace='flink/default/checkpoints/f6f58c166e273321b03789d1d5211855'}.
5864 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - All 1 
checkpoints found are already downloaded.
5864 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
f6f58c166e273321b03789d1d5211855 from Checkpoint 7 @ 1607521794905 for 
f6f58c166e273321b03789d1d5211855 located at 
<checkpoint-not-externally-addressable>.
5873 [flink-akka.actor.default-dispatcher-6] INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore
{code}
 

[~qqibrow] can you confirm from your side that the fix solves the problem?

(1.12.0 / 1.10.3 / 1.11.3)

> Large _metadata file lead to JobManager not responding when restart
> -------------------------------------------------------------------
>
>                 Key: FLINK-16931
>                 URL: https://issues.apache.org/jira/browse/FLINK-16931
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Coordination
>    Affects Versions: 1.9.2, 1.10.0, 1.11.0, 1.12.0
>            Reporter: Lu Niu
>            Assignee: Lu Niu
>            Priority: Critical
>             Fix For: 1.13.0
>
>
> When _metadata file is big, JobManager could never recover from checkpoint. 
> It fall into a loop that fetch checkpoint -> JM timeout -> restart. Here is 
> related log: 
> {code:java}
>  2020-04-01 17:08:25,689 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Recovering checkpoints from ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 
> 3 checkpoints in ZooKeeper.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to fetch 3 checkpoints from storage.
>  2020-04-01 17:08:25,698 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 50.
>  2020-04-01 17:08:48,589 INFO 
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - 
> Trying to retrieve checkpoint 51.
>  2020-04-01 17:09:12,775 INFO org.apache.flink.yarn.YarnResourceManager - The 
> heartbeat of JobManager with id 02500708baf0bb976891c391afd3d7d5 timed out.
> {code}
> Digging into the code, looks like ExecutionGraph::restart runs in JobMaster 
> main thread and finally calls 
> ZooKeeperCompletedCheckpointStore::retrieveCompletedCheckpoint which download 
> file form DFS. The main thread is basically blocked for a while because of 
> this. One possible solution is to making the downloading part async. More 
> things might need to consider as the original change tries to make it 
> single-threaded. [https://github.com/apache/flink/pull/7568]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to