[ https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163267#comment-17163267 ]
Jiangjie Qin commented on FLINK-18641: -------------------------------------- [~pnowojski] [~SleePy] There are actually more problems than the issues reported by this ticket. In checkpoint there are two orders: # the checkpoint taking order; # the checkpoint acknowledge handling order. This ticket reports the problem of unexpected checkpoint acknowledge handling order. While the acknowledge handling order is not super important, the checkpoint taking order is critical because that may cause correctness problem. By design, the checkpoint should always actually take the snapshot of the master hooks and OperatorCoordinator first before taking the checkpoint on the tasks. Prior to FLIP-27, this ordering is always guaranteed because the checkpoint on the tasks are triggered in one of the following ways: # The CheckpointCoordinator takes snapshot of the master hooks, wait for it to finish, and then trigger the snapshot on tasks. (Strict snapshot order is guaranteed by the checkpoint coordinator.) # The CheckpointCoordinator takes snapshot of the master hooks. The master hook triggers the checkpoint on the tasks using ExternallyInducedSource. The CheckpointCoordinator waits until the master hooks finishes snapshot. (In this case, the master hooks have to guarantee the consistency between the master state and the task states. The tasks may ack the checkpoint before the master hooks completes, but the acks won't be handled until the master hooks acks are handled). After FLIP-27, with the introduction of OperatorCoordinator, we need to make sure that the tasks checkpoint cannot be triggered before the OperatorCoordinator checkpoint. Given that the task checkpoint can be triggered by both the CheckpointCoordinator itself or the master hooks. We should first checkpoint the OperatorCoordinator, then the master hooks, lastly triggering the task checkpoint. The current code does not do this correctly. That means the master hooks can trigger the snapshot of the tasks before the OperatorCoordinators are checkpointed. Regarding the fix, I am thinking of the following: # Change the checkpoint order to checkpoint OperatorCoordinators first, i.e before checkpointing master hooks. # Keep the master hooks async, but invoke them after checkpointing the OperatorCoordinators. This is fine because according to the java doc of {{MasterTriggerRestoreHook#triggerCheckpoint()}} can choose to be synchronous if it wants to. Otherwise the CheckpointCoordinator will consider it as asynchronous friendly. # Change the logic to only call {{CheckpointCoordinator#completePendingCheckpoint()}} after all the checkpoints are acked, instead of only looking the task checkpoints. This is because the acknowledge handling order may vary given the current async checkpoint pattern. I agree that In long run, the operator coordinator can actually supersede the master hooks. So we can probably mark the master hooks as deprecated. > "Failure to finalize checkpoint" error in MasterTriggerRestoreHook > ------------------------------------------------------------------ > > Key: FLINK-18641 > URL: https://issues.apache.org/jira/browse/FLINK-18641 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.11.0 > Reporter: Brian Zhou > Priority: Major > > https://github.com/pravega/flink-connectors is a Pravega connector for Flink. > The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook` > interface to trigger the Pravega checkpoint during Flink checkpoints to make > sure the data recovery. The checkpoint recovery tests are running fine in > Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out. > Suspect it is related to the checkpoint coordinator thread model changes in > Flink 1.11 > Error stacktrace: > {code} > 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN > o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint > acknowledgement message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize > the pending checkpoint 3. Failure reason: Failure to finalize checkpoint. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948) > at > org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has > not been fully acknowledged yet > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:195) > at > org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021) > ... 9 common frames omitted > {code} > More detail in this mailing thread: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html > Also in https://github.com/pravega/flink-connectors/issues/387 -- This message was sent by Atlassian Jira (v8.3.4#803005)