[ https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761051#comment-17761051 ]
Tzu-Li (Gordon) Tai commented on FLINK-30238: --------------------------------------------- hi all, I'd like to move this ticket forward and make a call if things are working as expected, or there's an actual bug. First of all, lets narrow down the scope of this ticket to address only the following : {{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} to succeed after restore because CommittableSummaries should always have different checkpoint IDs. The separate issue with post-commit topologies not receiving committed committables can be addressed separately. Summarizing the facts we have so far: * If a LONG_MAX checkpoint ID special marker is propagated, it means {{endOfInput}} was called and it is expected behavior that the job will not function properly if restored from the generated savepoint. {{endOfInput}} should only ever be called when 1) reaching bounded end of input, or 2) stop-with-savepoint with the --drain option. * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} being called is because of the setup {{{}advanceTimestamp = true{}}}, which translates equivalently to stop-with-savepoint with --drain. With the above, can we conclude that: * {{stop-with-savepoint}} without draining works as expected, such that {{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not being sent and stored as committables state. * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs restored from these savepoints are not expected to function properly in the first place. I'm looking to verify that {{stop-with-savepoint}} without draining indeed doesn't call the {{endOfInput}} method - if that's the case, then I think we have a case closed? In parallel, just wanna jot these thoughts down to see if I'm missing anything obvious cc [~fpaul] [~gaoyunhaii]. > Unified Sink committer does not clean up state on final savepoint > ----------------------------------------------------------------- > > Key: FLINK-30238 > URL: https://issues.apache.org/jira/browse/FLINK-30238 > Project: Flink > Issue Type: Bug > Components: Connectors / Common > Affects Versions: 1.17.0, 1.15.3, 1.16.1 > Reporter: Fabian Paul > Priority: Critical > Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png > > > During stop-with-savepoint the committer only commits the pending > committables on notifyCheckpointComplete. > This has several downsides. > * Last committableSummary has checkpoint id LONG.MAX and is never cleared > from the state leading to that stop-with-savepoint does not work when the > pipeline recovers from a savepoint > * While the committables are committed during stop-with-savepoint they are > not forwarded to post-commit topology, potentially losing data and preventing > to close open transactions. -- This message was sent by Atlassian Jira (v8.20.10#820010)