[ https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686272#comment-17686272 ]
Yun Gao commented on FLINK-30238: --------------------------------- Hi, I might first complement some background before continuing discussion: Currently the sink topology could be simplified as writer -> committer -> post-committer topology, and the rough process of two-phase commit is # Writer writes to temporary transaction or intermediate file. # Writer emits Committables, which are the handles of the transactions or intermediate files, to the Committer on prepareSnapshotPreBarrier. # Committer records these Committables on snapshotting. # Committer commits these Committables on notifyCheckpointComplete. # Committer emits the Committed Committables to the post-committer topology If the job is bounded, then for the last piece of data, the process will be slightly different, based on the current implementation: # Writer received END_OF_DATA message and emits current pending Committables. Logically it should not emits new record since now, and also will start waiting for the final checkpoint. # Committer then also received END_OF_DATA and waiting for the final checkpoint. # Committer records the Committables on final checkpoint snapshotting. # Committer commits these Committables on the final checkpoint notifyCheckpointComplete. Since it has already received END_OF_DATA, it could not emits these Committables to the post-committer topology at this time point. Thus It looks to me that the root issue is how we deal with the last batch of Committables. The original thought for this issue is some method like: # For Committer, it emit the un-committed Committables to the post-committer topology, with special tags. # The post-commit topology might use connector specialized method to check if these Committables are committed, then wait until all the Committables to commit and do the cleanup in the final checkpoint notifyCheckpointComplete. What do you think about that ? > Unified Sink committer does not clean up state on final savepoint > ----------------------------------------------------------------- > > Key: FLINK-30238 > URL: https://issues.apache.org/jira/browse/FLINK-30238 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.17.0, 1.15.3, 1.16.1 > Reporter: Fabian Paul > Priority: Critical > > During stop-with-savepoint the committer only commits the pending > committables on notifyCheckpointComplete. > This has several downsides. > * Last committableSummary has checkpoint id LONG.MAX and is never cleared > from the state leading to that stop-with-savepoint does not work when the > pipeline recovers from a savepoint > * While the committables are committed during stop-with-savepoint they are > not forwarded to post-commit topology, potentially losing data and preventing > to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)