[ 
https://issues.apache.org/jira/browse/FLINK-30238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17761051#comment-17761051
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-30238:
---------------------------------------------

hi all, I'd like to move this ticket forward and make a call if things are 
working as expected, or there's an actual bug.

First of all, lets narrow down the scope of this ticket to address only the 
following :
{{stop-with-savepoint}} stores last CommittableSummary with LONG_MAX checkpoint 
ID to be persisted in savepoints, which prevents future {{stop-with-savepoint}} 
to succeed after restore because CommittableSummaries should always have 
different checkpoint IDs.

The separate issue with post-commit topologies not receiving committed 
committables can be addressed separately.

Summarizing the facts we have so far:
 * If a LONG_MAX checkpoint ID special marker is propagated, it means 
{{endOfInput}} was called and it is expected behavior that the job will not 
function properly if restored from the generated savepoint. {{endOfInput}} 
should only ever be called when 1) reaching bounded end of input, or 2) 
stop-with-savepoint with the --drain option.
 * The reason why the Kafka ITCase referenced by [~fpaul] has {{endOfInput}} 
being called is because of the setup {{{}advanceTimestamp = true{}}}, which 
translates equivalently to stop-with-savepoint with --drain.


With the above, can we conclude that:
 * {{stop-with-savepoint}} without draining works as expected, such that 
{{endOfInput}} is not being called and the LONG_MAX CommittableSummary is not 
being sent and stored as committables state.
 * {{stop-with-savepoint}} _with draining_ is also working as expected - jobs 
restored from these savepoints are not expected to function properly in the 
first place.

 

I'm looking to verify that {{stop-with-savepoint}} without draining indeed 
doesn't call the {{endOfInput}} method - if that's the case, then I think we 
have a case closed? In parallel, just wanna jot these thoughts down to see if 
I'm missing anything obvious cc [~fpaul] [~gaoyunhaii].

> Unified Sink committer does not clean up state on final savepoint
> -----------------------------------------------------------------
>
>                 Key: FLINK-30238
>                 URL: https://issues.apache.org/jira/browse/FLINK-30238
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.17.0, 1.15.3, 1.16.1
>            Reporter: Fabian Paul
>            Priority: Critical
>         Attachments: Screenshot 2023-03-09 at 1.47.11 PM.png, image (8).png
>
>
> During stop-with-savepoint the committer only commits the pending 
> committables on notifyCheckpointComplete.
> This has several downsides.
>  * Last committableSummary has checkpoint id LONG.MAX and is never cleared 
> from the state leading to that stop-with-savepoint does not work when the 
> pipeline recovers from a savepoint 
>  * While the committables are committed during stop-with-savepoint they are 
> not forwarded to post-commit topology, potentially losing data and preventing 
> to close open transactions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to