[
https://issues.apache.org/jira/browse/FLINK-18641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17163267#comment-17163267
]
Jiangjie Qin commented on FLINK-18641:
--------------------------------------
[~pnowojski] [~SleePy] There are actually more problems than the issues
reported by this ticket. In checkpoint there are two orders:
# the checkpoint taking order;
# the checkpoint acknowledge handling order.
This ticket reports the problem of unexpected checkpoint acknowledge handling
order. While the acknowledge handling order is not super important, the
checkpoint taking order is critical because that may cause correctness problem.
By design, the checkpoint should always actually take the snapshot of the
master hooks and OperatorCoordinator first before taking the checkpoint on the
tasks.
Prior to FLIP-27, this ordering is always guaranteed because the checkpoint on
the tasks are triggered in one of the following ways:
# The CheckpointCoordinator takes snapshot of the master hooks, wait for it to
finish, and then trigger the snapshot on tasks. (Strict snapshot order is
guaranteed by the checkpoint coordinator.)
# The CheckpointCoordinator takes snapshot of the master hooks. The master
hook triggers the checkpoint on the tasks using ExternallyInducedSource. The
CheckpointCoordinator waits until the master hooks finishes snapshot. (In this
case, the master hooks have to guarantee the consistency between the master
state and the task states. The tasks may ack the checkpoint before the master
hooks completes, but the acks won't be handled until the master hooks acks are
handled).
After FLIP-27, with the introduction of OperatorCoordinator, we need to make
sure that the tasks checkpoint cannot be triggered before the
OperatorCoordinator checkpoint. Given that the task checkpoint can be triggered
by both the CheckpointCoordinator itself or the master hooks. We should first
checkpoint the OperatorCoordinator, then the master hooks, lastly triggering
the task checkpoint.
The current code does not do this correctly. That means the master hooks can
trigger the snapshot of the tasks before the OperatorCoordinators are
checkpointed.
Regarding the fix, I am thinking of the following:
# Change the checkpoint order to checkpoint OperatorCoordinators first, i.e
before checkpointing master hooks.
# Keep the master hooks async, but invoke them after checkpointing the
OperatorCoordinators. This is fine because according to the java doc of
{{MasterTriggerRestoreHook#triggerCheckpoint()}} can choose to be synchronous
if it wants to. Otherwise the CheckpointCoordinator will consider it as
asynchronous friendly.
# Change the logic to only call
{{CheckpointCoordinator#completePendingCheckpoint()}} after all the checkpoints
are acked, instead of only looking the task checkpoints. This is because the
acknowledge handling order may vary given the current async checkpoint pattern.
I agree that In long run, the operator coordinator can actually supersede the
master hooks. So we can probably mark the master hooks as deprecated.
> "Failure to finalize checkpoint" error in MasterTriggerRestoreHook
> ------------------------------------------------------------------
>
> Key: FLINK-18641
> URL: https://issues.apache.org/jira/browse/FLINK-18641
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.11.0
> Reporter: Brian Zhou
> Priority: Major
>
> https://github.com/pravega/flink-connectors is a Pravega connector for Flink.
> The ReaderCheckpointHook[1] class uses the Flink `MasterTriggerRestoreHook`
> interface to trigger the Pravega checkpoint during Flink checkpoints to make
> sure the data recovery. The checkpoint recovery tests are running fine in
> Flink 1.10, but it has below issues in Flink 1.11 causing the tests time out.
> Suspect it is related to the checkpoint coordinator thread model changes in
> Flink 1.11
> Error stacktrace:
> {code}
> 2020-07-09 15:39:39,999 30945 [jobmanager-future-thread-5] WARN
> o.a.f.runtime.jobmaster.JobMaster - Error while processing checkpoint
> acknowledgement message
> org.apache.flink.runtime.checkpoint.CheckpointException: Could not finalize
> the pending checkpoint 3. Failure reason: Failure to finalize checkpoint.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1033)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:948)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.lambda$acknowledgeCheckpoint$4(SchedulerBase.java:802)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.SerializedThrowable: Pending checkpoint has
> not been fully acknowledged yet
> at
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.finalizeCheckpoint(PendingCheckpoint.java:298)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1021)
> ... 9 common frames omitted
> {code}
> More detail in this mailing thread:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Pravega-connector-cannot-recover-from-the-checkpoint-due-to-quot-Failure-to-finalize-checkpoint-quot-td36652.html
> Also in https://github.com/pravega/flink-connectors/issues/387
--
This message was sent by Atlassian Jira
(v8.3.4#803005)