[ 
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938239#comment-17938239
 ] 

Arvid Heise commented on FLINK-28639:
-------------------------------------

Hi [~yunfengzhou] ,

AWS folks are hitting this current limitation with 
https://issues.apache.org/jira/browse/FLINK-37416 . They have lots of 
SplitFinished events in their source connector, which may get lost during 
failover leading to unread splits.

I would like to take over this ticket but I propose a different approach that 
also piggy-backs on your previous work on (OC->subtask). I'd like to do so, 
because I have 2.5 concerns with this proposal:
 # There is still a chance to lose an event if the CloseGatewayEvent is sent 
roughly at the same time as the SplitFinished.
 # If we buffer events on the subtask side, there is no guarantee that on 
rescale the event is sent from the same subtask that would have sent it 
regularly. For example, consider a keyed operator with coordinator where an 
event for key k is stored in subtask 2 while k is now assigned to subtask 1. 
There may be a race condition between subtask 2 sending the recovered event and 
subtask 1 sending a new event.
 # We add 2 new RPC messages per checkpoint and subtask with coordinator. This 
may introduce bottlenecks in the future and limit our ability to have very low 
checkpointing intervals.

I'd like to propose a different approach:
 # We buffer the events on the coordinator instead of the reader.
 # Then, we don't need new RPC messages - the coordinator already knows if the 
event should be buffered or not. In particular, it needs to buffer iff a 
subtask gateway is closed.
 # We eliminate the first race condition because we avoid split brain scenarios.
 # On recovery, the coordinator processes the buffered events before any 
subtask eliminating the second race condition.

However, we need to add a bit complexity on the coordinator state. The state 
will now be retrieved in two phases:
 # As before, checkpoint coordinator state and use futures to send a barrier 
only once that state has been taken.
 # Additionally, use a second future to wait for AcknowledgeCheckpointEvent and 
returns the serialized content of all newly buffered events 
(reader->coordinator).
 # Add this new state to the OperatorState as the last step of building the 
OperatorState.

WDYT? I'd like to fix this soonish. 

CC [~lindong]

 

 

> Preserve distributed consistency of OperatorEvents from subtasks to 
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-28639
>                 URL: https://issues.apache.org/jira/browse/FLINK-28639
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.3
>            Reporter: Yunfeng Zhou
>            Assignee: Yunfeng Zhou
>            Priority: Minor
>              Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC 
> communications. In this step, we would also guarantee the consistency of 
> operator events sent from subtasks to OCs. Combined with the other subtask to 
> preserve the consistency of communications in the reverse direction, all 
> communications between OC and subtasks would be consistent across checkpoints 
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions 
> to the subtasks' gateways and make the subtasks aware of a checkpoint before 
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this 
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
> special operator event to its OC, which is the last operator event the OC 
> could receive from the subtask before the subtask completes the checkpoint. 
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its 
> checkpoint and closes its gateway. Then the checkpoint coordinator sends 
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they 
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in 
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both 
> directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to