[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686272#comment-17686272
 ] 

Yun Gao commented on FLINK-30238:
---------------------------------

Hi, I might first complement some background before continuing discussion:

Currently the sink topology could be simplified as  writer -> committer -> 
post-committer topology, and the rough process of two-phase commit is
 # Writer writes to temporary transaction or intermediate file. 
 # Writer emits Committables, which are the handles of the transactions or 
intermediate files, to the Committer on prepareSnapshotPreBarrier. 
 # Committer records these Committables on snapshotting. 
 # Committer commits these Committables on notifyCheckpointComplete. 
 # Committer emits the Committed Committables to the post-committer topology

If the job is bounded, then for the last piece of data, the process will be 
slightly different, based on the current implementation:
 # Writer received END_OF_DATA message and emits current pending Committables. 
Logically it should not emits new record since now, and also will start waiting 
for the final checkpoint. 
 # Committer then also received END_OF_DATA and waiting for the final 
checkpoint. 
 # Committer records the Committables on final checkpoint snapshotting. 
 # Committer commits these Committables on the final checkpoint 
notifyCheckpointComplete. Since it has already received END_OF_DATA, it could 
not emits these Committables to the post-committer topology at this time point. 

Thus It looks to me that the root issue is how we deal with the last batch of 
Committables. The original thought for this issue is some method like:
 # For Committer, it emit the un-committed Committables to the post-committer 
topology, with special tags. 
 # The post-commit topology might use connector specialized method to check if 
these Committables are committed, then wait until all the Committables to 
commit and do the cleanup in the final checkpoint notifyCheckpointComplete. 

What do you think about that ?

 

> Unified Sink committer does not clean up state on final savepoint
> -----------------------------------------------------------------
>
>                 Key: FLINK-30238
>                 URL: https://issues.apache.org/jira/browse/FLINK-30238
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.17.0, 1.15.3, 1.16.1
>            Reporter: Fabian Paul
>            Priority: Critical
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to