[
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761051#comment-17761051
]
Tzu-Li (Gordon) Tai commented on FLINK-30238:
---------------------------------------------
hi all, I'd like to move this ticket forward and make a call if things are
working as expected, or there's an actual bug.
First of all, lets narrow down the scope of this ticket to address only the
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}}
to succeed after restore because CommittableSummaries should always have
different checkpoint IDs.
The separate issue with post-commit topologies not receiving committed
committables can be addressed separately.
Summarizing the facts we have so far:
* If a LONG_MAX checkpoint ID special marker is propagated, it means
{{endOfInput}} was called and it is expected behavior that the job will not
function properly if restored from the generated savepoint. {{endOfInput}}
should only ever be called when 1) reaching bounded end of input, or 2)
stop-with-savepoint with the --drain option.
* The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}}
being called is because of the setup {{{}advanceTimestamp = true{}}}, which
translates equivalently to stop-with-savepoint with --drain.
With the above, can we conclude that:
* {{stop-with-savepoint}} without draining works as expected, such that
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not
being sent and stored as committables state.
* {{stop-with-savepoint}} _with draining_ is also working as expected - jobs
restored from these savepoints are not expected to function properly in the
first place.
I'm looking to verify that {{stop-with-savepoint}} without draining indeed
doesn't call the {{endOfInput}} method - if that's the case, then I think we
have a case closed? In parallel, just wanna jot these thoughts down to see if
I'm missing anything obvious cc [~fpaul] [~gaoyunhaii].
> Unified Sink committer does not clean up state on final savepoint
> -----------------------------------------------------------------
>
> Key: FLINK-30238
> URL: https://issues.apache.org/jira/browse/FLINK-30238
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.17.0, 1.15.3, 1.16.1
> Reporter: Fabian Paul
> Priority: Critical
> Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending
> committables on notifyCheckpointComplete.
> This has several downsides.
> * Last committableSummary has checkpoint id LONG.MAX and is never cleared
> from the state leading to that stop-with-savepoint does not work when the
> pipeline recovers from a savepoint
> * While the committables are committed during stop-with-savepoint they are
> not forwarded to post-commit topology, potentially losing data and preventing
> to close open transactions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)