[
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17653799#comment-17653799
]
Yunfeng Zhou commented on FLINK-26029:
--------------------------------------
According to offline discussions with Dong Lin, a solution to this problem
could be to make OperatorCoordinators generate checkpoint barriers and send
them to their subtasks. The subtasks would need to align these barriers with
the ones they receive from upstream operators or sources, and actually trigger
the checkpoint when checkpoint barrier alignment is reached.
The solution mentioned above requires further discussion and the community's
agreement about the behavior and performance of Flink runtime during
checkpoints. Given that currently no subclass of OperatorCoordinator would be
affected by this function, it is thus of lower priority for now.
> Generalize the checkpoint protocol of OperatorCoordinator.
> ----------------------------------------------------------
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Jiangjie Qin
> Assignee: Yunfeng Zhou
> Priority: Minor
> Labels: extensibility, pull-request-available, stale-assigned
>
> **Problem statement**
> Currently, the JM uses OperatorEventValve to control the communication from
> OperatorCoordinator (OC) to the subtasks in order to achieve state
> consistency across OC and subtasks after recovering from a checkpoint. The
> valve is closed when a checkpoint starts and reopened after the checkpoint
> barriers are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to
> OC.
> - It does not handle the control flow messages from OC to subtasks which are
> sent after checkpoint barriers have been sent to sources but before subtasks
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues
> might occur. For example, if a subtask sends messages to its coordinator
> after the checkpoint barriers are sent to the sources and before the subtasks
> receive the barriers, these messages would be recorded in the subtask's
> snapshot but not the coordinator's. When the Flink job recovers from this
> snapshot, the subtask would have a record of sending out the message while
> the coordinator has no record of receiving it.
> **Proposed solution**
> We plan to address this problem by extending the blocking period. The
> communication should be blocked before the OC starts the checkpoint and
> reopened for each individual subtask after that subtask finishes the
> checkpoint, to make sure that both OCs and subtasks would see the same
> version of message after recovering from a checkpoint. Communications in both
> directions should be blocked during this period. And the messages blocked on
> the subtasks side should be properly stored in the checkpoint snapshot for
> recovery.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)