[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638690#comment-17638690
 ] 

Maximilian Michels commented on FLINK-29856:
--------------------------------------------

I think the reason why Flink doesn't call notifyCheckpointComplete() anymore on 
"intermediate" savepoints is purely for recovery reasons where we want to 
ensure that we only call notifyCheckpointComplete() once. Arguably, this isn't 
really the case because we can fail directly after restoring from a checkpoint 
but at least we will then only commit the already committed data and not any 
older data, as would be the case when a savepoint had been committing the data 
before we fell back to an earlier checkpoint.

The question is, would it be sufficient for your use case if the next 
checkpoint committed the data? Do you need notifyCheckpointComplete() to run 
immediately after you take the "intermediate" savepoint?
{quote}However, the semantics of stop with Savepoint supporting 
notifyCheckpointComplete and intermediate Savepoint not supporting it does not 
fully make sense to me, since the stop with Savepoint still "commits side 
effects".
{quote}
It kind of makes sense because on a stop-with-savepoint, the job will cease to 
exist and we have to run notifyCheckpointComplete() because we might otherwise 
never execute it. However, in the case of "intermediate" savepoints, the next 
checkpoint will eventually run and commit any pending data since the last 
checkpoint (the savepoint data being a subset of this data).

> Triggering savepoint does not trigger operator notifyCheckpointComplete
> -----------------------------------------------------------------------
>
>                 Key: FLINK-29856
>                 URL: https://issues.apache.org/jira/browse/FLINK-29856
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0, 1.16.0
>            Reporter: Mason Chen
>            Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke 
> notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output.
> After the ticket was initially filed, I also checked with other stateful UDFs 
> and observed the same behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to