[ 
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-26029:
---------------------------------
    Fix Version/s: 1.17.0
                       (was: 1.16.0)

> Generalize the checkpoint protocol of OperatorCoordinator.
> ----------------------------------------------------------
>
>                 Key: FLINK-26029
>                 URL: https://issues.apache.org/jira/browse/FLINK-26029
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.3
>            Reporter: Jiangjie Qin
>            Assignee: Yunfeng Zhou
>            Priority: Major
>              Labels: extensibility, pull-request-available, stale-assigned
>             Fix For: 1.17.0
>
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from 
> OperatorCoordinator (OC) to the subtasks in order to achieve state 
> consistency across OC and subtasks after recovering from a checkpoint. The 
> valve is closed when a checkpoint starts and reopened after the checkpoint 
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits 
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to 
> OC.
> - It does not handle the control flow messages from OC to subtasks which are 
> sent after checkpoint barriers have been sent to sources but before subtasks 
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues 
> might occur. For example, if a subtask sends messages to its coordinator 
> after the checkpoint barriers are sent to the sources and before the subtasks 
> receive the barriers, these messages would be recorded in the subtask's 
> snapshot but not the coordinator's. When the Flink job recovers from this 
> snapshot, the subtask would have a record of sending out the message while 
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The 
> communication should be blocked before the OC starts the checkpoint and 
> reopened for each individual subtask after that subtask finishes the 
> checkpoint, to make sure that both OCs and subtasks would see the same 
> version of message after recovering from a checkpoint. Communications in both 
> directions should be blocked during this period. And the messages blocked on 
> the subtasks side should be properly stored in the checkpoint snapshot for 
> recovery.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to