[
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Huang Xingbo updated FLINK-26029:
---------------------------------
Fix Version/s: 1.17.0
(was: 1.16.0)
> Generalize the checkpoint protocol of OperatorCoordinator.
> ----------------------------------------------------------
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Jiangjie Qin
> Assignee: Yunfeng Zhou
> Priority: Major
> Labels: extensibility, pull-request-available, stale-assigned
> Fix For: 1.17.0
>
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from
> OperatorCoordinator (OC) to the subtasks in order to achieve state
> consistency across OC and subtasks after recovering from a checkpoint. The
> valve is closed when a checkpoint starts and reopened after the checkpoint
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to
> OC.
> - It does not handle the control flow messages from OC to subtasks which are
> sent after checkpoint barriers have been sent to sources but before subtasks
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues
> might occur. For example, if a subtask sends messages to its coordinator
> after the checkpoint barriers are sent to the sources and before the subtasks
> receive the barriers, these messages would be recorded in the subtask's
> snapshot but not the coordinator's. When the Flink job recovers from this
> snapshot, the subtask would have a record of sending out the message while
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The
> communication should be blocked before the OC starts the checkpoint and
> reopened for each individual subtask after that subtask finishes the
> checkpoint, to make sure that both OCs and subtasks would see the same
> version of message after recovering from a checkpoint. Communications in both
> directions should be blocked during this period. And the messages blocked on
> the subtasks side should be properly stored in the checkpoint snapshot for
> recovery.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)