[
https://issues.apache.org/jira/browse/FLINK-28639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938642#comment-17938642
]
Arvid Heise commented on FLINK-28639:
-------------------------------------
{quote}This message loss sounds weird. Could you please explain more about the
process that this message gets lost? Besides, if this loss exists, it sounds
more like a bug related to Flink's rpc system, which requires another bugfix
apart from the consistency of OperatorEvents we are talking about now.
{quote}
SplitFinished is an event from operator to coordinator. CloseGatewayEvent is an
event from coordinator to operator. They may be sent at the same time. I don't
see how the Flink RPC system can prevent that.
This results in a race condition where SplitFinished event need to be processed
before CloseGatewayEvent or else it won't be part of the checkpoint.
Specifically, the operator handed of the split to the coordinator and the
coordinator only sees that split after it has snapshot its state. Thus, after
recovery neither the coordinator nor the operator owns the split anymore.
{quote}If subtask 1 has recovered from the state for key k which was
snapshotted by subtask 2, and learned from the state that an operator event for
key k has been sent before, why does it send the event again? There should be
only one operator event being sent to the coordinator.
{quote}
How would it learn from the state that an operator event has been sent before?
The operator would need to explicitly book-keep all sent events which is what
dynamoDB are doing as a workaround but it's clumsy. Consider, for example, some
coordinator that tracks joins skews. Let's assume we sent an event whenever 10k
records with a given key appear. First, subtask 2 sends some event for the
first 10k records of k. That event gets blocked and checkpointed. Second, we
have a rescale with a global restart. Now subtask 1 is responsible for k but
the event remains in the state of subtask 2. Third, subtask 1 recovers much
faster than subtask 2 and sends an event for 20k records of k. Fourth, subtask
2 recovers and resends the first event, which may overwrite the coordinator's
state to reset the 20k to 10k.
The example is a bit artificial but operators and coordinators rely on the PRC
messages being in order, which is not guaranteed with your proposal.
{quote}In the proposed approach below this description, it seems the bottleneck
problem is not resolved. Instead of asking subtasks to checkpoint events, the
coordinator or job manager would need to checkpoint the events on its own. It
increases the chance that the JM becomes the bottleneck.
{quote}
My assumption is that checkpointed events are rare. In fact, we have seen
inconsistency in the sources only 4 years after release of the source
coordinator. I have no insight on how it is for CEP operators.
In your proposal, JM needs to serialize and deserialize 2*parallelism events
additionally. Unless the events are bigger or there are a ton of events from
operator -> coordinator, my proposal may actually reduce the load on the JM. It
would be nice if you could share insights into the CEP coordination.
{quote}Besides, a second state for AcknowledgeCheckpointEvent could obviously
increase the duration of event blocking in checkpoints. The events would need
to wait not only for the upstream operators, but also for the downstream
operators. This might be a possible performance regression, especially for
source operators.
{quote}
I'm not proposing to touch \{AcknowledgeCheckpointEvent}, so I don't know where
this is coming from. The whole adjustments are in the CheckpointCoordinator,
where we wait for a second future onto stuff that is already done in the
current code base.
{quote}In fact, this issue was introduced to better support non-source
operators. As for source operators, given that FLIP-27 was proposed far before
this issue and these sources had been working well before that, they should not
have such event consistency problems.
{quote}
But they have as apparent in the linked ticket. We just have almost no sources
where reader is sending consistency-relevant events to the enumerators.
Standard Kafka has no dynamic events at all. In most other enumerators splits
are only assigned based on knowledge that is being generated in the enumerator
itself (e.g. S3 file discovery).
DynamoDB source is unique in that regard as the parent split needs to wait for
child splits to be finished. Within a couple of weeks since it's release, AWS
folks have observed a couple of cases that come from inconsistency. It's always
a matter of probability.
{quote}We can continue discuss this proposal but I'm not sure if it would be
finished soon. If it is confirmed that this issue is a blocker for aws
connector and needs to be fixed ASAP, my suggestion is to check if the fix
could be added to this connector first, as this approach has been proved in
other existing Flink connectors.
{quote}
Yes, they have already implemented a workaround for that issue.
[https://github.com/apache/flink-connector-aws/pull/193] . It's pretty much
based around making events idempotent and replay them on recovery.
But it's unfortunate that it's necessary and we should solve it on framework
level. It's similar to how you fixed it on coordinator->operator level.
> Preserve distributed consistency of OperatorEvents from subtasks to
> OperatorCoordinator
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-28639
> URL: https://issues.apache.org/jira/browse/FLINK-28639
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Checkpointing
> Affects Versions: 1.14.3
> Reporter: Yunfeng Zhou
> Assignee: Yunfeng Zhou
> Priority: Minor
> Labels: pull-request-available, stale-assigned
>
> This is the second step to solving the consistency issue of OC
> communications. In this step, we would also guarantee the consistency of
> operator events sent from subtasks to OCs. Combined with the other subtask to
> preserve the consistency of communications in the reverse direction, all
> communications between OC and subtasks would be consistent across checkpoints
> and global failovers.
> To achieve the goal of this step, we need to add closing/reopening functions
> to the subtasks' gateways and make the subtasks aware of a checkpoint before
> they receive the checkpoint barriers. The general process would be as follows.
> 1. When the OC starts checkpoint, it notifies all subtasks about this
> information.
> 2. After being notified about the ongoing checkpoint in OC, a subtask sends a
> special operator event to its OC, which is the last operator event the OC
> could receive from the subtask before the subtask completes the checkpoint.
> Then the subtask closes its gateway.
> 3. After receiving this special event from all subtasks, the OC finishes its
> checkpoint and closes its gateway. Then the checkpoint coordinator sends
> checkpoint barriers to the sources.
> 4. If the subtask or the OC generate any event to send to each other, they
> buffer the events locally.
> 5. When a subtask starts checkpointing, it also stores the buffered events in
> the checkpoint.
> 6. After the subtask completes the checkpoint, communications in both
> directions are recovered and the buffered events are sent out.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)