[ 
https://issues.apache.org/jira/browse/FLINK-19816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408801#comment-17408801
 ] 

Till Rohrmann commented on FLINK-19816:
---------------------------------------

Hi [~Paul Lin], I think you are not running into the concrete issue that is 
fixed with this ticket. Instead I believe that you are running into FLINK-11813 
that will be fixed with the next release. 

I think the following is happening: The job reaches a globally terminal state 
(FAILED). Then it tells the {{Dispatcher}} that triggers the clean up of HA 
information. After the cleanup has happened, the {{Dispatcher}} process loses 
the leadership and is restarted. Since you seem to use the application mode/per 
job mode, Flink will be started with the same job but with no checkpoint 
information since it has been cleaned up. This will ultimately result in a 
restart.

Part of the problem has been solved via FLINK-21979 but the last remaining 
piece is FLINK-11813 that will most likely introduce a {{JobResultStore}} that 
can outlive Flink. Only by persisting information about the job status that can 
survive a cluster failure, we are able to properly resolve the situation. As a 
consequence, there will be some bookkeeping information that needs to be taken 
care of by the user/operator of Flink.

> Flink restored from a wrong checkpoint (a very old one and not the last 
> completed one)
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-19816
>                 URL: https://issues.apache.org/jira/browse/FLINK-19816
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: Steven Zhen Wu
>            Assignee: Till Rohrmann
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.11.3, 1.12.0
>
>         Attachments: jm.log
>
>
> h2. Summary
> Upon failure, it seems that Flink didn't restore from the last completed 
> checkpoint. Instead, it restored from a very old checkpoint. As a result, 
> Kafka offsets are invalid and caused the job to replay from the beginning as 
> Kafka consumer "auto.offset.reset" was set to "EARLIEST".
> This is an embarrassingly parallel stateless job. Parallelism is over 1,000. 
> I have the full log file from jobmanager at INFO level available upon request.
> h2. Sequence of events from the logs
> Just before the failure, checkpoint *210768* completed.
> {code}
> 2020-10-25 02:35:05,970 INFO 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator 
> [jobmanager-future-thread-5] - Completed checkpoint 210768 for job 
> 233b4938179c06974e4535ac8a868675 (4623776 bytes in 120402 ms).
> {code}
> During restart, somehow it decided to restore from a very old checkpoint 
> *203531*.
> {code:java}
> 2020-10-25 02:36:03,301 INFO  
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess 
> [cluster-io-thread-3]  - Start SessionDispatcherLeaderProcess.
> 2020-10-25 02:36:03,302 INFO  
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess 
> [cluster-io-thread-5]  - Recover all persisted job graphs.
> 2020-10-25 02:36:03,304 INFO  com.netflix.bdp.s3fs.BdpS3FileSystem            
>              [cluster-io-thread-25]  - Deleting path: 
> s3://<bucket>/checkpoints/XM3B/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/c31aec1e-07a7-4193-aa00-3fbe83f9e2e6
> 2020-10-25 02:36:03,307 INFO  
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess 
> [cluster-io-thread-5]  - Trying to recover job with job id 
> 233b4938179c06974e4535ac8a868675.
> 2020-10-25 02:36:03,381 INFO  com.netflix.bdp.s3fs.BdpS3FileSystem            
>              [cluster-io-thread-25]  - Deleting path: 
> s3://<bucket>/checkpoints/Hh86/clapp_avro-clapp_avro_nontvui/1593/233b4938179c06974e4535ac8a868675/chk-210758/4ab92f70-dfcd-4212-9b7f-bdbecb9257fd
> ...
> 2020-10-25 02:36:03,427 INFO  
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore 
> [flink-akka.actor.default-dispatcher-82003]  - Recovering checkpoints from 
> ZooKeeper.
> 2020-10-25 02:36:03,432 INFO  
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore 
> [flink-akka.actor.default-dispatcher-82003]  - Found 0 checkpoints in 
> ZooKeeper.
> 2020-10-25 02:36:03,432 INFO  
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore 
> [flink-akka.actor.default-dispatcher-82003]  - Trying to fetch 0 checkpoints 
> from storage.
> 2020-10-25 02:36:03,432 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
> [flink-akka.actor.default-dispatcher-82003]  - Starting job 
> 233b4938179c06974e4535ac8a868675 from savepoint 
> s3://<bucket>/checkpoints/metadata/clapp_avro-clapp_avro_nontvui/1113/47e2a25a8d0b696c7d0d423722bb6f54/chk-203531/_metadata
>  ()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to