Naci Simsek created FLINK-38307:
-----------------------------------

             Summary: Resuming from both savepoints and checkpoints is NOT 
supported for Changelog State Backend
                 Key: FLINK-38307
                 URL: https://issues.apache.org/jira/browse/FLINK-38307
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing, Runtime / State Backends
    Affects Versions: 1.19.1, 1.18.1, 1.19.0, 1.18.0
         Environment: The piece of code that triggers is:
{code:java}
 private void checkForcedFullSnapshotSupport(CheckpointOptions 
checkpointOptions) {
        if 
(checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)
                && !stateBackend.supportsNoClaimRestoreMode()) {
            throw new IllegalStateException(
                    String.format(
                            "Configured state backend (%s) does not support 
enforcing a full"
                                    + " snapshot. If you are restoring in %s 
mode, please"
                                    + " consider choosing %s mode.",
                            stateBackend, RecoveryClaimMode.NO_CLAIM, 
RecoveryClaimMode.CLAIM));
        } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
            SavepointType savepointType = (SavepointType) 
checkpointOptions.getCheckpointType();
            if 
(!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
                throw new IllegalStateException(
                        String.format(
                                "Configured state backend (%s) does not support 
%s savepoints",
                                stateBackend, savepointType.getFormatType()));
            }
        }
    } {code}
 

 The configuration for *{{CLAIM}}* is not even checked when throwing the 
exception.
            Reporter: Naci Simsek
         Attachments: flink_changelog_restore_logs.zip

Start Flink deployment with below settings:
{code:java}
execution.savepoint-restore-mode, CLAIM
execution.checkpointing.interval, 20s
execution.checkpointing.externalized-checkpoint-retention, 
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1 {code}
 

Wait for couple checkpoints, and cancel the app.

Then, start new flink app by resuming from either an existing checkpoint or 
savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
{code:java}
execution.savepoint-restore-mode, CLAIM
state.backend.changelog.enabled, true
execution.checkpointing.interval, 20s
dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
state.backend.changelog.storage, filesystem
execution.checkpointing.externalized-checkpoint-retention, 
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1{code}
*Expected result:*

As stated here in the Flink doc, app should successfully be restored and 
operate successfully.

[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]

 

*Actual Result:*

App at first, seemed to be restored successfully, till the first checkpoint it 
performs.

As soon as a checkpoint is triggered, flink throws below exception:
{code:java}
2025-08-30 13:22:56,176 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
checkpoint 16 (type=CheckpointType{name='Checkpoint', 
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job 
356832174120c71afcaabdd5a46a60d9.2025-08-30 13:22:56,185 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a 
state snapshot on operator Source: Car data generator source for checkpoint 
162025-08-30 13:22:56,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Car 
data generator source -> Timestamps/Watermarks (1/1) 
(202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost 
(dataPort=54889).java.lang.Exception: Error while triggering checkpoint 16 for 
Source: Car data generator source -> Timestamps/Watermarks (1/1)#0    at 
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]    
at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]    at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]    at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
 ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
 ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
 ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
 ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    at 
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]    at 
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
 [?:?]    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) 
[?:?]    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) 
[?:?]    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) 
[?:?]Caused by: java.lang.IllegalStateException: Configured state backend 
(org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not 
support enforcing a full snapshot. If you are restoring in NO_CLAIM mode, 
please consider choosing either CLAIM or LEGACY restore mode.    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
 ~[flink-dist-1.18.1.jar:1.18.1]    at 
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
 ~[flink-dist-1.18.1.jar:1.18.1]    ... 31 more {code}
This exception makes it impossible to activate changelog state backend for the 
pipelines where state restore is essential.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to