[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17162410#comment-17162410
 ] 

Jiangjie Qin commented on FLINK-18641:
--------------------------------------

Looked into this a little bit. The problems is following:
 # The Pravega connector uses the {{MasterTriggerRestoreHook}} and 
{{ExternallyInducedSource}} for checkpointing. That means the tasks may start 
checkpointing after the {{MasterTriggerRestoreHook.triggerCheckpoint()}} is 
invoked.
 # In Flink 1.10, the checkpoint thread blocks on the future returned by the 
master hook. This becomes asynchronous in Flink 1.11. It only guarantees that 
the JM triggers the task checkpoint after the master state is checkpointed. 
However, this does not cover the case where the task checkpoints are triggered 
by the master hooks rather than by JM itself.
 # Due to the change in (2), what happens in this case is that the task 
checkpionts are triggered by the master hook, and acknowledged before the 
master hook future is completed. From JM's perspective, once all the task 
checkpoints are acked, it will try to finalize the checkpoint, but found that 
the master checkpoint has not completed yet, thus an exception is thrown.

This is a bug in JM when working with {{MasterTriggerRestoreHook}} and 
{{ExternallyInducedSource.}} I'll submit a fix for this.

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> ------------------------------------------------------------------
>
>                 Key: FLINK-18641
>                 URL: https://issues.apache.org/jira/browse/FLINK-18641
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.0
>            Reporter: Brian Zhou
>            Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>          at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>          at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>          at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>          at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>          ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to