Naci Simsek created FLINK-38307: ----------------------------------- Summary: Resuming from both savepoints and checkpoints is NOT supported for Changelog State Backend Key: FLINK-38307 URL: https://issues.apache.org/jira/browse/FLINK-38307 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / State Backends Affects Versions: 1.19.1, 1.18.1, 1.19.0, 1.18.0 Environment: The piece of code that triggers is: {code:java} private void checkForcedFullSnapshotSupport(CheckpointOptions checkpointOptions) { if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT) && !stateBackend.supportsNoClaimRestoreMode()) { throw new IllegalStateException( String.format( "Configured state backend (%s) does not support enforcing a full" + " snapshot. If you are restoring in %s mode, please" + " consider choosing %s mode.", stateBackend, RecoveryClaimMode.NO_CLAIM, RecoveryClaimMode.CLAIM)); } else if (checkpointOptions.getCheckpointType().isSavepoint()) { SavepointType savepointType = (SavepointType) checkpointOptions.getCheckpointType(); if (!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) { throw new IllegalStateException( String.format( "Configured state backend (%s) does not support %s savepoints", stateBackend, savepointType.getFormatType())); } } } {code}
The configuration for *{{CLAIM}}* is not even checked when throwing the exception. Reporter: Naci Simsek Attachments: flink_changelog_restore_logs.zip Start Flink deployment with below settings: {code:java} execution.savepoint-restore-mode, CLAIM execution.checkpointing.interval, 20s execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints, 1 {code} Wait for couple checkpoints, and cancel the app. Then, start new flink app by resuming from either an existing checkpoint or savepoint, does NOT matter, with state backend CHANGELOG enabled as follows: {code:java} execution.savepoint-restore-mode, CLAIM state.backend.changelog.enabled, true execution.checkpointing.interval, 20s dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl state.backend.changelog.storage, filesystem execution.checkpointing.externalized-checkpoint-retention, RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints, 1{code} *Expected result:* As stated here in the Flink doc, app should successfully be restored and operate successfully. [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing] *Actual Result:* App at first, seemed to be restored successfully, till the first checkpoint it performs. As soon as a checkpoint is triggered, flink throws below exception: {code:java} 2025-08-30 13:22:56,176 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 16 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job 356832174120c71afcaabdd5a46a60d9.2025-08-30 13:22:56,185 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a state snapshot on operator Source: Car data generator source for checkpoint 162025-08-30 13:22:56,219 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car data generator source -> Timestamps/Watermarks (1/1) (202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost (dataPort=54889).java.lang.Exception: Error while triggering checkpoint 16 for Source: Car data generator source -> Timestamps/Watermarks (1/1)#0 at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023) ~[flink-dist-1.18.1.jar:1.18.1] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) [?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) [?:?]Caused by: java.lang.IllegalStateException: Configured state backend (org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not support enforcing a full snapshot. If you are restoring in NO_CLAIM mode, please consider choosing either CLAIM or LEGACY restore mode. at org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136) ~[flink-dist-1.18.1.jar:1.18.1] at org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335) ~[flink-dist-1.18.1.jar:1.18.1] ... 31 more {code} This exception makes it impossible to activate changelog state backend for the pipelines where state restore is essential. -- This message was sent by Atlassian Jira (v8.20.10#820010)