[ 
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938605#comment-17938605
 ] 

Yunfeng Zhou commented on FLINK-28639:
--------------------------------------

Hi [~arvid] ,

 
Thanks for proposing an approach to better fix this problem. I have some 
questions regarding the descriptions of the proposal.


{quote}There is still a chance to lose an event if the CloseGatewayEvent is 
sent roughly at the same time as the SplitFinished.
{quote}
This message loss sounds weird. Could you please explain more about the process 
that this message gets lost? Besides, if this loss exists, it sounds more like 
a bug related to Flink's rpc system, which requires another bugfix apart from 
the consistency of OperatorEvents we are talking about now.
{quote}consider a keyed operator with coordinator where an event for key k is 
stored in subtask 2 while k is now assigned to subtask 1
{quote}
If subtask 1 has recovered from the state for key k which was snapshotted by 
subtask 2, and learned from the state that an operator event for key k has been 
sent before, why does it send the event again? There should be only one 
operator event being sent to the coordinator.
{quote}We add 2 new RPC messages per checkpoint and subtask with coordinator. 
This may introduce bottlenecks in the future
{quote}
In the proposed approach below this description, it seems the bottleneck 
problem is not resolved. Instead of asking subtasks to checkpoint events, the 
coordinator or job manager would need to checkpoint the events on its own. It 
increases the chance that the JM becomes the bottleneck.



Besides, a second state for AcknowledgeCheckpointEvent could obviously increase 
the duration of event blocking in checkpoints. The events would need to wait 
not only for the upstream operators, but also for the downstream operators. 
This might be a possible performance regression, especially for source 
operators.

> Preserve distributed consistency of OperatorEvents from subtasks to 
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-28639
>                 URL: https://issues.apache.org/jira/browse/FLINK-28639
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.3
>            Reporter: Yunfeng Zhou
>            Assignee: Yunfeng Zhou
>            Priority: Minor
>              Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC 
> communications. In this step, we would also guarantee the consistency of 
> operator events sent from subtasks to OCs. Combined with the other subtask to 
> preserve the consistency of communications in the reverse direction, all 
> communications between OC and subtasks would be consistent across checkpoints 
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions 
> to the subtasks' gateways and make the subtasks aware of a checkpoint before 
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this 
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
> special operator event to its OC, which is the last operator event the OC 
> could receive from the subtask before the subtask completes the checkpoint. 
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its 
> checkpoint and closes its gateway. Then the checkpoint coordinator sends 
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they 
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in 
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both 
> directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to