[ 
https://issues.apache.org/jira/browse/FLINK-38307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18038382#comment-18038382
 ] 

Naci Simsek edited comment on FLINK-38307 at 11/14/25 4:09 PM:
---------------------------------------------------------------

To whom face this issue, resuming flink app from an existing checkpoint (or 
savepoint) is possible via passing the parameter {{--restoreMode CLAIM}} before 
the parameter -{{{}fromSavepoint{}}}, like below:

 
{code:java}
./bin/standalone-job.sh start-foreground --job-classname 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing --restoreMode 
CLAIM -fromSavepoint file:///tmp/checkpoints/xxxxxx/chk-37/_metadata{code}


was (Author: JIRAUSER302646):
To whom face this issue, resuming flink app from an existing checkpoint (not 
savepoint) and passing the parameter {{--restoreMode CLAIM}} before the 
parameter -{{{}fromSavepoint{}}} is a workaround! Like below:

 
{code:java}
./bin/standalone-job.sh start-foreground --job-classname 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing --restoreMode 
CLAIM -fromSavepoint file:///tmp/checkpoints/xxxxxx/chk-37/_metadata{code}

> Resuming from both savepoints and checkpoints is NOT supported for Changelog 
> State Backend
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38307
>                 URL: https://issues.apache.org/jira/browse/FLINK-38307
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.18.0, 1.19.0, 1.18.1, 1.19.1
>         Environment: The piece of code that triggers is:
> {code:java}
>  private void checkForcedFullSnapshotSupport(CheckpointOptions 
> checkpointOptions) {
>         if 
> (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)
>                 && !stateBackend.supportsNoClaimRestoreMode()) {
>             throw new IllegalStateException(
>                     String.format(
>                             "Configured state backend (%s) does not support 
> enforcing a full"
>                                     + " snapshot. If you are restoring in %s 
> mode, please"
>                                     + " consider choosing %s mode.",
>                             stateBackend, RecoveryClaimMode.NO_CLAIM, 
> RecoveryClaimMode.CLAIM));
>         } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
>             SavepointType savepointType = (SavepointType) 
> checkpointOptions.getCheckpointType();
>             if 
> (!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
>                 throw new IllegalStateException(
>                         String.format(
>                                 "Configured state backend (%s) does not 
> support %s savepoints",
>                                 stateBackend, savepointType.getFormatType()));
>             }
>         }
>     } {code}
>  
>  The configuration for *{{CLAIM}}* is not even checked when throwing the 
> exception.
>            Reporter: Naci Simsek
>            Priority: Major
>         Attachments: flink_changelog_restore_logs.zip
>
>
> Start Flink deployment with below settings:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> execution.checkpointing.interval, 20s
> execution.checkpointing.externalized-checkpoint-retention, 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1 {code}
>  
> Wait for couple checkpoints, and cancel the app.
> Then, start new flink app by resuming from either an existing checkpoint or 
> savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> state.backend.changelog.enabled, true
> execution.checkpointing.interval, 20s
> dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
> state.backend.changelog.storage, filesystem
> execution.checkpointing.externalized-checkpoint-retention, 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1{code}
> *Expected result:*
> As stated here in the Flink doc, app should successfully be restored and 
> operate successfully.
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
>  
> *Actual Result:*
> App at first, seemed to be restored successfully, till the first checkpoint 
> it performs.
> As soon as a checkpoint is triggered, flink throws below exception:
>  
> {noformat}
> 2025-08-30 13:22:56,176 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 16 (type=CheckpointType{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job 
> 356832174120c71afcaabdd5a46a60d9.
> 2025-08-30 13:22:56,185 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a 
> state snapshot on operator Source: Car data generator source for checkpoint 16
> 2025-08-30 13:22:56,219 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Car 
> data generator source -> Timestamps/Watermarks (1/1) 
> (202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0) 
> switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost 
> (dataPort=54889).
> java.lang.Exception: Error while triggering checkpoint 16 for Source: Car 
> data generator source -> Timestamps/Watermarks (1/1)#0    
> at 
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
> ~[?:?]    
> at 
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  ~[?:?]    
> at 
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  ~[?:?]    
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]    
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>  ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>  ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>  ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>  ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>  [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]    
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]    
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
>  [?:?]    
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]    
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]  
>   
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183) 
> [?:?]Caused by: java.lang.IllegalStateException: Configured state backend 
> (org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not 
> support enforcing a full snapshot. If you are restoring in NO_CLAIM mode, 
> please consider choosing either CLAIM or LEGACY restore mode.    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
>  ~[flink-dist-1.18.1.jar:1.18.1]    
> at 
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
>  ~[flink-dist-1.18.1.jar:1.18.1]    ... 31 more{noformat}
> This exception makes it impossible to activate changelog state backend for 
> the pipelines where state restore is essential.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to