[
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938239#comment-17938239
]
Arvid Heise commented on FLINK-28639:
-------------------------------------
Hi [~yunfengzhou] ,
AWS folks are hitting this current limitation with
https://issues.apache.org/jira/browse/FLINK-37416 . They have lots of
SplitFinished events in their source connector, which may get lost during
failover leading to unread splits.
I would like to take over this ticket but I propose a different approach that
also piggy-backs on your previous work on (OC->subtask). I'd like to do so,
because I have 2.5 concerns with this proposal:
# There is still a chance to lose an event if the CloseGatewayEvent is sent
roughly at the same time as the SplitFinished.
# If we buffer events on the subtask side, there is no guarantee that on
rescale the event is sent from the same subtask that would have sent it
regularly. For example, consider a keyed operator with coordinator where an
event for key k is stored in subtask 2 while k is now assigned to subtask 1.
There may be a race condition between subtask 2 sending the recovered event and
subtask 1 sending a new event.
# We add 2 new RPC messages per checkpoint and subtask with coordinator. This
may introduce bottlenecks in the future and limit our ability to have very low
checkpointing intervals.
I'd like to propose a different approach:
# We buffer the events on the coordinator instead of the reader.
# Then, we don't need new RPC messages - the coordinator already knows if the
event should be buffered or not. In particular, it needs to buffer iff a
subtask gateway is closed.
# We eliminate the first race condition because we avoid split brain scenarios.
# On recovery, the coordinator processes the buffered events before any
subtask eliminating the second race condition.
However, we need to add a bit complexity on the coordinator state. The state
will now be retrieved in two phases:
# As before, checkpoint coordinator state and use futures to send a barrier
only once that state has been taken.
# Additionally, use a second future to wait for AcknowledgeCheckpointEvent and
returns the serialized content of all newly buffered events
(reader->coordinator).
# Add this new state to the OperatorState as the last step of building the
OperatorState.
WDYT? I'd like to fix this soonish.
CC [~lindong]
> Preserve distributed consistency of OperatorEvents from subtasks to
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-28639
> URL: https://issues.apache.org/jira/browse/FLINK-28639
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Yunfeng Zhou
> Assignee: Yunfeng Zhou
> Priority: Minor
> Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC
> communications. In this step, we would also guarantee the consistency of
> operator events sent from subtasks to OCs. Combined with the other subtask to
> preserve the consistency of communications in the reverse direction, all
> communications between OC and subtasks would be consistent across checkpoints
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions
> to the subtasks' gateways and make the subtasks aware of a checkpoint before
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a
> special operator event to its OC, which is the last operator event the OC
> could receive from the subtask before the subtask completes the checkpoint.
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its
> checkpoint and closes its gateway. Then the checkpoint coordinator sends
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both
> directions are recovered and the buffered events are sent out.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)