[ 
https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17623149#comment-17623149
 ] 

Yun Gao commented on FLINK-29459:
---------------------------------

Hi [~fpaul] [~KristoffSC] Very thanks for fixing the issues and very sorry for 
missed the previous notifications for in the holiday then. Regarding the 
current sink v2 mechanism I have some more thoughts:

Currently we rely on the CommittableSummary and CommittableWithLineage message 
to coordinate between Writers and Committers. For each checkpoint, each Writer 
subtask would first emit a CommittableSummary to the Committers, which contains 
the number of Committables to send. Then the Writer subtask emit that number of 
CommittableWithLineage messages to the Committers. The Committers relies on the 
number in the summary to detect if it has received all the Committables from 
each write subtask. But the mechanism contains some issues:
 # It could only support the partitioner with one target for each source 
between Writer and Committer, like forward / rescale. If for the long run we 
want to support the Committers with arbitrary parallelism, it might cause 
issues if Writer and Committer have different parallelism. Similarly it also 
complicate the authors of connectors that using PreCommitterTopolgy. 
 # With unaligned checkpoint and rescale after recovering, if some 
CommittableSummary messages have been processed and stored in the snapshot, but 
the corresponding CommittableWithLineage messages have been assigned to other 
tasks, the number of Committables would be not correct. 

One possible alternative might be instead of relying on numbers, we might first 
emit the Committables, then followed by a broadcast message that confirms the 
end of a checkpoint. The Committable would know that it has received all the 
Committables after received the Confirmed messages from all the previous tasks. 
The mechanism is a bit like how watermark works. Then for the above two issues:


 # It would support all the partitioners. 
 # For unaligned checkpoint and rescaling case, we could simply commit all the 
Committables with the startup id and ignore all the confirmation messages of 
the same checkpoint id on startup. We could then wait for the confirmation 
message of the next checkpoint id to mark all the previous checkpoints as 
finished. 

How do you think about this? Sorry if I overlook something. 

> Sink v2 has bugs in supporting legacy v1 implementations with global committer
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-29459
>                 URL: https://issues.apache.org/jira/browse/FLINK-29459
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.16.0, 1.17.0, 1.15.2
>            Reporter: Yun Gao
>            Assignee: Yun Gao
>            Priority: Major
>             Fix For: 1.17.0, 1.15.3, 1.16.1
>
>
> Currently when supporting Sink implementation using version 1 interface, 
> there are issues after restoring from a checkpoint after failover:
>  # In global committer operator, when restoring SubtaskCommittableManager, 
> the subtask id is replaced with the one in the current operator. This means 
> that the id originally is the id of the sender task (0 ~ N - 1), but after 
> restoring it has to be 0. This would cause Duplication Key exception during 
> restoring.
>  # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl 
> is always restored to 0 after failover for all the subtasks. This makes the 
> summary sent to the Global Committer is attached with wrong subtask id.
>  # For Committer operator, the checkpoint id of SubtaskCommittableManager is 
> always restored to 1 after failover, this make the following committable sent 
> to the global committer is attached with wrong checkpoint id. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to