[ 
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163267#comment-17163267
 ] 

Jiangjie Qin commented on FLINK-18641:
--------------------------------------

[~pnowojski] [~SleePy] There are actually more problems than the issues 
reported by this ticket. In checkpoint there are two orders:
 # the checkpoint taking order;
 # the checkpoint acknowledge handling order.

This ticket reports the problem of unexpected checkpoint acknowledge handling 
order. While the acknowledge handling order is not super important, the 
checkpoint taking order is critical because that may cause correctness problem.

By design, the checkpoint should always actually take the snapshot of the 
master hooks and OperatorCoordinator first before taking the checkpoint on the 
tasks.

Prior to FLIP-27, this ordering is always guaranteed because the checkpoint on 
the tasks are triggered in one of the following ways:
 # The CheckpointCoordinator takes snapshot of the master hooks, wait for it to 
finish, and then trigger the snapshot on tasks. (Strict snapshot order is 
guaranteed by the checkpoint coordinator.)
 # The CheckpointCoordinator takes snapshot of the master hooks. The master 
hook triggers the checkpoint on the tasks using ExternallyInducedSource. The 
CheckpointCoordinator waits until the master hooks finishes snapshot. (In this 
case, the master hooks have to guarantee the consistency between the master 
state and the task states. The tasks may ack the checkpoint before the master 
hooks completes, but the acks won't be handled until the master hooks acks are 
handled).

After FLIP-27, with the introduction of OperatorCoordinator, we need to make 
sure that the tasks checkpoint cannot be triggered before the 
OperatorCoordinator checkpoint. Given that the task checkpoint can be triggered 
by both the CheckpointCoordinator itself or the master hooks. We should first 
checkpoint the OperatorCoordinator, then the master hooks, lastly triggering 
the task checkpoint.

The current code does not do this correctly. That means the master hooks can 
trigger the snapshot of the tasks before the OperatorCoordinators are 
checkpointed.

Regarding the fix, I am thinking of the following:
 # Change the checkpoint order to checkpoint OperatorCoordinators first, i.e 
before checkpointing master hooks.
 # Keep the master hooks async, but invoke them after checkpointing the 
OperatorCoordinators. This is fine because according to the java doc of 
{{MasterTriggerRestoreHook#triggerCheckpoint()}} can choose to be synchronous 
if it wants to. Otherwise the CheckpointCoordinator will consider it as 
asynchronous friendly.
 # Change the logic to only call 
{{CheckpointCoordinator#completePendingCheckpoint()}} after all the checkpoints 
are acked, instead of only looking the task checkpoints. This is because the 
acknowledge handling order may vary given the current async checkpoint pattern.

I agree that In long run, the operator coordinator can actually supersede the 
master hooks. So we can probably mark the master hooks as deprecated.

> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> ------------------------------------------------------------------
>
>                 Key: FLINK-18641
>                 URL: https://issues.apache.org/jira/browse/FLINK-18641
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.0
>            Reporter: Brian Zhou
>            Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink. 
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` 
> interface to trigger the Pravega checkpoint during Flink checkpoints to make 
> sure the data recovery. The checkpoint recovery tests are running fine in 
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. 
> Suspect it is related to the checkpoint coordinator thread model changes in 
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN  
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint 
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize 
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
>          at 
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
>          at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>          at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>          at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>          at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has 
> not been fully acknowledged yet
>          at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>          at 
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
>          at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
>          ... 9 common frames omitted
> {code}
> More detail in this mailing thread: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to