[
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938864#comment-17938864
]
Yunfeng Zhou commented on FLINK-28639:
--------------------------------------
Got it. Despite that I still have some doubts about the drawbacks on my
proposal, I agree with it that operator events are rare so your approach is
better at not introducing a fixed 2 * parallelism events overhead. And I did
not see correctness problems in your proposal either. So +1 for pushing forward
your approach.
{quote}I'm not proposing to touch \{AcknowledgeCheckpointEvent}, so I don't
know where this is coming from. The whole adjustments are in the
CheckpointCoordinator, where we wait for a second future onto stuff that is
already done in the current code base.
{quote}
Just one more minor response here. Your approach also mentioned event
buffering, so I thought the processing of events would also be blocked when the
events are buffered, as in my approach. If so, the duration of this blocking
should be shortened as much as possible to avoid having influences on the
communications between operator and coordinator.
> Preserve distributed consistency of OperatorEvents from subtasks to
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-28639
> URL: https://issues.apache.org/jira/browse/FLINK-28639
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Yunfeng Zhou
> Assignee: Yunfeng Zhou
> Priority: Minor
> Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC
> communications. In this step, we would also guarantee the consistency of
> operator events sent from subtasks to OCs. Combined with the other subtask to
> preserve the consistency of communications in the reverse direction, all
> communications between OC and subtasks would be consistent across checkpoints
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions
> to the subtasks' gateways and make the subtasks aware of a checkpoint before
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a
> special operator event to its OC, which is the last operator event the OC
> could receive from the subtask before the subtask completes the checkpoint.
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its
> checkpoint and closes its gateway. Then the checkpoint coordinator sends
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both
> directions are recovered and the buffered events are sent out.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)