[
https://issues.apache.org/jira/browse/FLINK-38307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18038382#comment-18038382
]
Naci Simsek commented on FLINK-38307:
-------------------------------------
To whom face this issue, resuming flink app from an existing checkpoint (not
savepoint) and passing the parameter {{--restoreMode CLAIM}} before the
parameter -{{{}fromSavepoint{}}} is a workaround! Like below:
{code:java}
./bin/standalone-job.sh start-foreground --job-classname
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing --restoreMode
CLAIM -fromSavepoint file:///tmp/checkpoints/xxxxxx/chk-37/_metadata{code}
> Resuming from both savepoints and checkpoints is NOT supported for Changelog
> State Backend
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-38307
> URL: https://issues.apache.org/jira/browse/FLINK-38307
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.18.0, 1.19.0, 1.18.1, 1.19.1
> Environment: The piece of code that triggers is:
> {code:java}
> private void checkForcedFullSnapshotSupport(CheckpointOptions
> checkpointOptions) {
> if
> (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)
> && !stateBackend.supportsNoClaimRestoreMode()) {
> throw new IllegalStateException(
> String.format(
> "Configured state backend (%s) does not support
> enforcing a full"
> + " snapshot. If you are restoring in %s
> mode, please"
> + " consider choosing %s mode.",
> stateBackend, RecoveryClaimMode.NO_CLAIM,
> RecoveryClaimMode.CLAIM));
> } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
> SavepointType savepointType = (SavepointType)
> checkpointOptions.getCheckpointType();
> if
> (!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
> throw new IllegalStateException(
> String.format(
> "Configured state backend (%s) does not
> support %s savepoints",
> stateBackend, savepointType.getFormatType()));
> }
> }
> } {code}
>
> The configuration for *{{CLAIM}}* is not even checked when throwing the
> exception.
> Reporter: Naci Simsek
> Priority: Major
> Attachments: flink_changelog_restore_logs.zip
>
>
> Start Flink deployment with below settings:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> execution.checkpointing.interval, 20s
> execution.checkpointing.externalized-checkpoint-retention,
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1 {code}
>
> Wait for couple checkpoints, and cancel the app.
> Then, start new flink app by resuming from either an existing checkpoint or
> savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> state.backend.changelog.enabled, true
> execution.checkpointing.interval, 20s
> dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
> state.backend.changelog.storage, filesystem
> execution.checkpointing.externalized-checkpoint-retention,
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1{code}
> *Expected result:*
> As stated here in the Flink doc, app should successfully be restored and
> operate successfully.
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
>
> *Actual Result:*
> App at first, seemed to be restored successfully, till the first checkpoint
> it performs.
> As soon as a checkpoint is triggered, flink throws below exception:
>
> {noformat}
> 2025-08-30 13:22:56,176 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 16 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
> 356832174120c71afcaabdd5a46a60d9.
> 2025-08-30 13:22:56,185 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
> state snapshot on operator Source: Car data generator source for checkpoint 16
> 2025-08-30 13:22:56,219 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
> data generator source -> Timestamps/Watermarks (1/1)
> (202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
> (dataPort=54889).
> java.lang.Exception: Error while triggering checkpoint 16 for Source: Car
> data generator source -> Timestamps/Watermarks (1/1)#0
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> [?:?]
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
>
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> [?:?]Caused by: java.lang.IllegalStateException: Configured state backend
> (org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
> support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
> please consider choosing either CLAIM or LEGACY restore mode.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
> ~[flink-dist-1.18.1.jar:1.18.1] ... 31 more{noformat}
> This exception makes it impossible to activate changelog state backend for
> the pipelines where state restore is essential.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)