[ 
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938642#comment-17938642
 ] 

Arvid Heise commented on FLINK-28639:
-------------------------------------

{quote}This message loss sounds weird. Could you please explain more about the 
process that this message gets lost? Besides, if this loss exists, it sounds 
more like a bug related to Flink's rpc system, which requires another bugfix 
apart from the consistency of OperatorEvents we are talking about now.
{quote}
SplitFinished is an event from operator to coordinator. CloseGatewayEvent is an 
event from coordinator to operator. They may be sent at the same time. I don't 
see how the Flink RPC system can prevent that.

This results in a race condition where SplitFinished event need to be processed 
before CloseGatewayEvent or else it won't be part of the checkpoint. 
Specifically, the operator handed of the split to the coordinator and the 
coordinator only sees that split after it has snapshot its state. Thus, after 
recovery neither the coordinator nor the operator owns the split anymore.
{quote}If subtask 1 has recovered from the state for key k which was 
snapshotted by subtask 2, and learned from the state that an operator event for 
key k has been sent before, why does it send the event again? There should be 
only one operator event being sent to the coordinator.
{quote}
How would it learn from the state that an operator event has been sent before? 
The operator would need to explicitly book-keep all sent events which is what 
dynamoDB are doing as a workaround but it's clumsy. Consider, for example, some 
coordinator that tracks joins skews. Let's assume we sent an event whenever 10k 
records with a given key appear. First, subtask 2 sends some event for the 
first 10k records of k. That event gets blocked and checkpointed. Second, we 
have a rescale with a global restart. Now subtask 1 is responsible for k but 
the event remains in the state of subtask 2. Third, subtask 1 recovers much 
faster than subtask 2 and sends an event for 20k records of k. Fourth, subtask 
2 recovers and resends the first event, which may overwrite the coordinator's 
state to reset the 20k to 10k.

The example is a bit artificial but operators and coordinators rely on the PRC 
messages being in order, which is not guaranteed with your proposal.
{quote}In the proposed approach below this description, it seems the bottleneck 
problem is not resolved. Instead of asking subtasks to checkpoint events, the 
coordinator or job manager would need to checkpoint the events on its own. It 
increases the chance that the JM becomes the bottleneck.
{quote}
My assumption is that checkpointed events are rare. In fact, we have seen 
inconsistency in the sources only 4 years after release of the source 
coordinator. I have no insight on how it is for CEP operators.

In your proposal, JM needs to serialize and deserialize 2*parallelism events 
additionally. Unless the events are bigger or there are a ton of events from 
operator -> coordinator, my proposal may actually reduce the load on the JM. It 
would be nice if you could share insights into the CEP coordination.
{quote}Besides, a second state for AcknowledgeCheckpointEvent could obviously 
increase the duration of event blocking in checkpoints. The events would need 
to wait not only for the upstream operators, but also for the downstream 
operators. This might be a possible performance regression, especially for 
source operators.
{quote}
I'm not proposing to touch \{AcknowledgeCheckpointEvent}, so I don't know where 
this is coming from. The whole adjustments are in the CheckpointCoordinator, 
where we wait for a second future onto stuff that is already done in the 
current code base.
{quote}In fact, this issue was introduced to better support non-source 
operators. As for source operators, given that FLIP-27 was proposed far before 
this issue and these sources had been working well before that, they should not 
have such event consistency problems.
{quote}
But they have as apparent in the linked ticket. We just have almost no sources 
where reader is sending consistency-relevant events to the enumerators. 
Standard Kafka has no dynamic events at all. In most other enumerators splits 
are only assigned based on knowledge that is being generated in the enumerator 
itself (e.g. S3 file discovery).

DynamoDB source is unique in that regard as the parent split needs to wait for 
child splits to be finished. Within a couple of weeks since it's release, AWS 
folks have observed a couple of cases that come from inconsistency. It's always 
a matter of probability.
{quote}We can continue discuss this proposal but I'm not sure if it would be 
finished soon. If it is confirmed that this issue is a blocker for aws 
connector and needs to be fixed ASAP, my suggestion is to check if the fix 
could be added to this connector first, as this approach has been proved in 
other existing Flink connectors.
{quote}
Yes, they have already implemented a workaround for that issue. 
[https://github.com/apache/flink-connector-aws/pull/193] . It's pretty much 
based around making events idempotent and replay them on recovery.

But it's unfortunate that it's necessary and we should solve it on framework 
level. It's similar to how you fixed it on coordinator->operator level.

> Preserve distributed consistency of OperatorEvents from subtasks to 
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-28639
>                 URL: https://issues.apache.org/jira/browse/FLINK-28639
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.3
>            Reporter: Yunfeng Zhou
>            Assignee: Yunfeng Zhou
>            Priority: Minor
>              Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC 
> communications. In this step, we would also guarantee the consistency of 
> operator events sent from subtasks to OCs. Combined with the other subtask to 
> preserve the consistency of communications in the reverse direction, all 
> communications between OC and subtasks would be consistent across checkpoints 
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions 
> to the subtasks' gateways and make the subtasks aware of a checkpoint before 
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this 
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a 
> special operator event to its OC, which is the last operator event the OC 
> could receive from the subtask before the subtask completes the checkpoint. 
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its 
> checkpoint and closes its gateway. Then the checkpoint coordinator sends 
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they 
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in 
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both 
> directions are recovered and the buffered events are sent out.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to