[
https://issues.apache.org/jira/browse/FLINK-29856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17638247#comment-17638247
]
Maximilian Michels commented on FLINK-29856:
--------------------------------------------
I think this might be caused by FLINK-25191. It was a conscious design choice
as also explained in
[FLIP-193|https://cwiki.apache.org/confluence/display/FLINK/FLIP-193%3A+Snapshots+ownership#FLIP193:Snapshotsownership-SkippingSavepointsforRecovery]
to skip side effects like notifyCheckpointComplete() for intermediate
savepoints.
The reason is that savepoints can be drawn at any point in time by the user who
also controls the lifespan of the savepoint data. During recovery, savepoints
were used just like checkpoints which could lead to issues when the savepoint
had already been deleted or an option was used to skip savepoints during
recovery. In this case, the recovery would use an older checkpoint and
potentially cause side effects more than once, i.e. calling
notifyCheckpointComplete() again for already processed records/state.
The new behavior is to treat "intermediate" savepoints independently of
checkpoints which means they won't be used by normal recovery and hence should
not be allowed to cause side effects. Intermediate savepoints can still be used
to restore a job but notifyCheckpointComplete() will only be called upon
creating the first checkpoint. Note that "final" savepoints which lead to
termination of the job are still allowed to cause side effects via
notifyCheckpointComplete().
> Triggering savepoint does not trigger source operator checkpoint
> -----------------------------------------------------------------
>
> Key: FLINK-29856
> URL: https://issues.apache.org/jira/browse/FLINK-29856
> Project: Flink
> Issue Type: Improvement
> Components: Runtime / Checkpointing
> Affects Versions: 1.16.0
> Reporter: Mason Chen
> Priority: Major
>
> When I trigger a savepoint with the Flink K8s operator, I verified for two
> sources (KafkaSource and MultiClusterKafkaSource) do not invoke snapshotState
> or notifyCheckpointComplete. This is easily reproducible in a simple pipeline
> (e.g. KafkaSource -> print). In this case, the savepoint is complete and
> successful, which is verified by the Flink Checkpoint UI tab and the
> jobmanager logs. e.g. `
> Triggering checkpoint 3 (type=SavepointType\{name='Savepoint',
> postCheckpointAction=NONE, formatType=CANONICAL})`
>
> However, when the checkpoint occurs via the interval, I do see the sources
> checkpointing properly and expected logs in the output.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)