[
https://issues.apache.org/jira/browse/FLINK-20396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen updated FLINK-20396:
---------------------------------
Summary: Add "OperatorCoordinator.resetSubtask()" to fix order problems of
"subtaskFailed()" (was: Replace "OperatorCoordinator.subtaskFailed()" with
"subtaskRestored()")
> Add "OperatorCoordinator.resetSubtask()" to fix order problems of
> "subtaskFailed()"
> -----------------------------------------------------------------------------------
>
> Key: FLINK-20396
> URL: https://issues.apache.org/jira/browse/FLINK-20396
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.11.2
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.12.0, 1.11.3
>
>
> There are no strong order guarantees between
> {{OperatorCoordinator.subtaskFailed()}} and
> {{OperatorCoordinator.notifyCheckpointComplete()}}.
> It can happen that a checkpoint completes after the notification for task
> failure is sent:
> - {{OperatorCoordinator.checkpoint()}}
> - {{OperatorCoordinator.subtaskFailed()}}
> - {{OperatorCoordinator.checkpointComplete()}}
> The subtask failure here does not know whether the previous checkpoint
> completed or not. It cannot decide what state the subtask will be in after
> recovery.
> There is no easy fix right now to strictly guarantee the order of the method
> calls, so alternatively we need to provide the necessary information to
> reason about the status of tasks.
> We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with
> {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That
> implementations get the explicit checkpoint ID for the subtask recovery, and
> can align that with the IDs of checkpoints that were taken.
> It is still (in rare cases) possible that for a specific checkpoint C,
> {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before
> {{OperatorCoordinator.checkpointComplete(C)}}.
> h3. Background
> The Checkpointing Procedure is partially asynchronous on the {{JobManager}} /
> {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint,
> the finalization (writing out metadata and registering the checkpoint in
> ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.
> This sequence of events can happen:
> - tasks acks checkpoint
> - checkpoint fully acknowledged, finalization starts
> - task fails
> - task failure notification is dispatched
> - checkpoint completes.
> For task failures and checkpoint completion, no order is defined.
> However, for task restore and checkpoint completion, the order is well
> defined: When a task is restored, pending checkpoints are either canceled or
> complete. None can be within finalization. That is currently guaranteed with
> a lock in the {{CheckpointCoordinator}}.
> (An implication of that being that restores can be blocking operations in the
> scheduler, which is not ideal from the perspective of making the scheduler
> async/non-blocking, but it is currently essential for correctness).
--
This message was sent by Atlassian Jira
(v8.3.4#803005)