[
https://issues.apache.org/jira/browse/FLINK-26029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dong Lin updated FLINK-26029:
-----------------------------
Description:
Currently, the JM uses OperatorEventValve to control the communication from
OperatorCoordinator (OC) to the subtasks in order to achieve state consistency
across OC and subtasks in case of job failures. The valve is closed when a
checkpoint starts and reopened after the checkpoint barriers are sent to the
source subtasks.
While this mechanism works for the source operators, it unnecessarily limits
the general usage of OC due to the following limitations:
- It does not handle (e.g. blocks) the control flow messages from subtasks to
OC.
- It does not handle the control flow messages from OC to subtasks which are
sent after checkpoint barriers have been sent to sources but before subtasks
have received those barriers.
If the limitations mentioned above are not satisfied, consistency issues might
occur. For example, if a subtask sends messages to its coordinator after the
checkpoint barriers are sent to the sources and before the subtasks receive the
barriers, these messages would be recorded in the subtask's snapshot but not
the coordinator's. When the Flink job recovers from this snapshot, the subtask
would have a record of sending out the message while the coordinator has no
record of receiving it.
was:
Currently the JM opens all the event valves from the OperatorCoordinator to the
subtasks after the checkpoint barriers are sent to the Source subtasks. While
this works for the Source Operators, it unnecessarily limits general usage of
the OperatorCoordinator for other operators.
To generalize the protocol, we can change the JM to open the event valve of the
subtasks that have finished the local checkpoint. So the protocol would become
following:
# Let the OC finish processing all the incoming OperatorEvents before the
snapshot.
# Wait until all the outgoing OperatorEvents before the snapshot are sent and
acked.
# Shut the event valve so no outgoing events can be sent to the subtasks.
# Send checkpoint barriers to the Source operators.
# Open the corresponding event valve of a subtask when the
AcknowledgeCheckpoint messages from that subtask is received.
> Generalize the checkpoint protocol of OperatorCoordinator.
> ----------------------------------------------------------
>
> Key: FLINK-26029
> URL: https://issues.apache.org/jira/browse/FLINK-26029
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Jiangjie Qin
> Assignee: Yunfeng Zhou
> Priority: Major
> Labels: extensibility, pull-request-available, stale-assigned
> Fix For: 1.16.0
>
>
> Currently, the JM uses OperatorEventValve to control the communication from
> OperatorCoordinator (OC) to the subtasks in order to achieve state
> consistency across OC and subtasks in case of job failures. The valve is
> closed when a checkpoint starts and reopened after the checkpoint barriers
> are sent to the source subtasks.
> While this mechanism works for the source operators, it unnecessarily limits
> the general usage of OC due to the following limitations:
> - It does not handle (e.g. blocks) the control flow messages from subtasks to
> OC.
> - It does not handle the control flow messages from OC to subtasks which are
> sent after checkpoint barriers have been sent to sources but before subtasks
> have received those barriers.
> If the limitations mentioned above are not satisfied, consistency issues
> might occur. For example, if a subtask sends messages to its coordinator
> after the checkpoint barriers are sent to the sources and before the subtasks
> receive the barriers, these messages would be recorded in the subtask's
> snapshot but not the coordinator's. When the Flink job recovers from this
> snapshot, the subtask would have a record of sending out the message while
> the coordinator has no record of receiving it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)