[ 
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638247#comment-17638247
 ] 

Maximilian Michels commented on FLINK-29856:
--------------------------------------------

I think this might be caused by FLINK-25191. It was a conscious design choice 
as also explained in 
[FLIP-193|https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership#FLIP193:Snapshotsownership-SkippingSavepointsforRecovery]
 to skip side effects like notifyCheckpointComplete() for intermediate 
savepoints.

The reason is that savepoints can be drawn at any point in time by the user who 
also controls the lifespan of the savepoint data. During recovery, savepoints 
were used just like checkpoints which could lead to issues when the savepoint 
had already been deleted or an option was used to skip savepoints during 
recovery. In this case, the recovery would use an older checkpoint and 
potentially cause side effects more than once, i.e. calling 
notifyCheckpointComplete() again for already processed records/state.

The new behavior is to treat "intermediate" savepoints independently of 
checkpoints which means they won't be used by normal recovery and hence should 
not be allowed to cause side effects. Intermediate savepoints can still be used 
to restore a job but notifyCheckpointComplete() will only be called upon 
creating the first checkpoint. Note that "final" savepoints which lead to 
termination of the job are still allowed to cause side effects via 
notifyCheckpointComplete().

> Triggering savepoint does not trigger source operator checkpoint 
> -----------------------------------------------------------------
>
>                 Key: FLINK-29856
>                 URL: https://issues.apache.org/jira/browse/FLINK-29856
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.16.0
>            Reporter: Mason Chen
>            Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two 
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState 
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline 
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and 
> successful, which is verified by the Flink Checkpoint UI tab and the 
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint', 
> postCheckpointAction=NONE, formatType=CANONICAL})`
>  
> However, when the checkpoint occurs via the interval, I do see the sources 
> checkpointing properly and expected logs in the output. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to