[ 
https://issues.apache.org/jira/browse/FLINK-20396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-20396:
---------------------------------
    Summary: Add "OperatorCoordinator.resetSubtask()" to fix order problems of 
"subtaskFailed()"  (was: Replace "OperatorCoordinator.subtaskFailed()" with 
"subtaskRestored()")

> Add "OperatorCoordinator.resetSubtask()" to fix order problems of 
> "subtaskFailed()"
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-20396
>                 URL: https://issues.apache.org/jira/browse/FLINK-20396
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.11.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.12.0, 1.11.3
>
>
> There are no strong order guarantees between 
> {{OperatorCoordinator.subtaskFailed()}} and 
> {{OperatorCoordinator.notifyCheckpointComplete()}}.
> It can happen that a checkpoint completes after the notification for task 
> failure is sent:
>   - {{OperatorCoordinator.checkpoint()}}
>   - {{OperatorCoordinator.subtaskFailed()}}
>   - {{OperatorCoordinator.checkpointComplete()}}
> The subtask failure here does not know whether the previous checkpoint 
> completed or not. It cannot decide what state the subtask will be in after 
> recovery.
> There is no easy fix right now to strictly guarantee the order of the method 
> calls, so alternatively we need to provide the necessary information to 
> reason about the status of tasks.
> We should replace {{OperatorCoordinator.subtaskFailed(int subtask)}} with 
> {{OperatorCoordinator.subtaskRestored(int subtask, long checkpoint)}}. That 
> implementations get the explicit checkpoint ID for the subtask recovery, and 
> can align that with the IDs of checkpoints that were taken.
> It is still (in rare cases) possible that for a specific checkpoint C, 
> {{OperatorCoordinator.subtaskRestored(subtaskIndex, C)) comes before 
> {{OperatorCoordinator.checkpointComplete(C)}}.
> h3. Background
> The Checkpointing Procedure is partially asynchronous on the {{JobManager}} / 
> {{CheckpointCoordinator}}: After all subtasks acknowledged the checkpoint, 
> the finalization (writing out metadata and registering the checkpoint in 
> ZooKeeper) happens in an I/O thread, and the checkpoint completes after that.
> This sequence of events can happen:
>   - tasks acks checkpoint
>   - checkpoint fully acknowledged, finalization starts
>   - task fails
>   - task failure notification is dispatched
>   - checkpoint completes.
> For task failures and checkpoint completion, no order is defined.
> However, for task restore and checkpoint completion, the order is well 
> defined: When a task is restored, pending checkpoints are either canceled or 
> complete. None can be within finalization. That is currently guaranteed with 
> a lock in the {{CheckpointCoordinator}}.
> (An implication of that being that restores can be blocking operations in the 
> scheduler, which is not ideal from the perspective of making the scheduler 
> async/non-blocking, but it is currently essential for correctness).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to