[
https://issues.apache.org/jira/browse/FLINK-38307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Naci Simsek updated FLINK-38307:
--------------------------------
Description:
Start Flink deployment with below settings:
{code:java}
execution.savepoint-restore-mode, CLAIM
execution.checkpointing.interval, 20s
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1 {code}
Wait for couple checkpoints, and cancel the app.
Then, start new flink app by resuming from either an existing checkpoint or
savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
{code:java}
execution.savepoint-restore-mode, CLAIM
state.backend.changelog.enabled, true
execution.checkpointing.interval, 20s
dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
state.backend.changelog.storage, filesystem
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1{code}
*Expected result:*
As stated here in the Flink doc, app should successfully be restored and
operate successfully.
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
*Actual Result:*
App at first, seemed to be restored successfully, till the first checkpoint it
performs.
As soon as a checkpoint is triggered, flink throws below exception:
{noformat}
2025-08-30 13:22:56,176 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 16 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
356832174120c71afcaabdd5a46a60d9.
2025-08-30 13:22:56,185 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
state snapshot on operator Source: Car data generator source for checkpoint 16
2025-08-30 13:22:56,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
data generator source -> Timestamps/Watermarks (1/1)
(202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
(dataPort=54889).
java.lang.Exception: Error while triggering checkpoint 16 for Source: Car data
generator source -> Timestamps/Watermarks (1/1)#0
at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
~[flink-dist-1.18.1.jar:1.18.1]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[?:?]
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[?:?]Caused by: java.lang.IllegalStateException: Configured state backend
(org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
please consider choosing either CLAIM or LEGACY restore mode.
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
~[flink-dist-1.18.1.jar:1.18.1]
at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
~[flink-dist-1.18.1.jar:1.18.1] ... 31 more{noformat}
This exception makes it impossible to activate changelog state backend for the
pipelines where state restore is essential.
was:
Start Flink deployment with below settings:
{code:java}
execution.savepoint-restore-mode, CLAIM
execution.checkpointing.interval, 20s
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1 {code}
Wait for couple checkpoints, and cancel the app.
Then, start new flink app by resuming from either an existing checkpoint or
savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
{code:java}
execution.savepoint-restore-mode, CLAIM
state.backend.changelog.enabled, true
execution.checkpointing.interval, 20s
dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
state.backend.changelog.storage, filesystem
execution.checkpointing.externalized-checkpoint-retention,
RETAIN_ON_CANCELLATION
execution.checkpointing.max-concurrent-checkpoints, 1{code}
*Expected result:*
As stated here in the Flink doc, app should successfully be restored and
operate successfully.
[https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
*Actual Result:*
App at first, seemed to be restored successfully, till the first checkpoint it
performs.
As soon as a checkpoint is triggered, flink throws below exception:
{noformat}
2025-08-30 13:22:56,176 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 16 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
356832174120c71afcaabdd5a46a60d9.2025-08-30 13:22:56,185 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
state snapshot on operator Source: Car data generator source for checkpoint
162025-08-30 13:22:56,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
data generator source -> Timestamps/Watermarks (1/1)
(202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
(dataPort=54889).java.lang.Exception: Error while triggering checkpoint 16 for
Source: Car data generator source -> Timestamps/Watermarks (1/1)#0 at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
~[flink-dist-1.18.1.jar:1.18.1] at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?] at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[?:?] at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[?:?]Caused by: java.lang.IllegalStateException: Configured state backend
(org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
please consider choosing either CLAIM or LEGACY restore mode. at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
~[flink-dist-1.18.1.jar:1.18.1] ... 31 more{noformat}
{code:java}
2025-08-30 13:22:56,176 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
checkpoint 16 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
356832174120c71afcaabdd5a46a60d9.2025-08-30 13:22:56,185 DEBUG
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
state snapshot on operator Source: Car data generator source for checkpoint
162025-08-30 13:22:56,219 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
data generator source -> Timestamps/Watermarks (1/1)
(202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
(dataPort=54889).java.lang.Exception: Error while triggering checkpoint 16 for
Source: Car data generator source -> Timestamps/Watermarks (1/1)#0 at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
~[flink-dist-1.18.1.jar:1.18.1] at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:?] at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1] at
java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?] at
java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[?:?] at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[?:?] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[?:?] at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
[?:?]Caused by: java.lang.IllegalStateException: Configured state backend
(org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
please consider choosing either CLAIM or LEGACY restore mode. at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
~[flink-dist-1.18.1.jar:1.18.1] at
org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
~[flink-dist-1.18.1.jar:1.18.1] ... 31 more {code}
This exception makes it impossible to activate changelog state backend for the
pipelines where state restore is essential.
> Resuming from both savepoints and checkpoints is NOT supported for Changelog
> State Backend
> ------------------------------------------------------------------------------------------
>
> Key: FLINK-38307
> URL: https://issues.apache.org/jira/browse/FLINK-38307
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / State Backends
> Affects Versions: 1.18.0, 1.19.0, 1.18.1, 1.19.1
> Environment: The piece of code that triggers is:
> {code:java}
> private void checkForcedFullSnapshotSupport(CheckpointOptions
> checkpointOptions) {
> if
> (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT)
> && !stateBackend.supportsNoClaimRestoreMode()) {
> throw new IllegalStateException(
> String.format(
> "Configured state backend (%s) does not support
> enforcing a full"
> + " snapshot. If you are restoring in %s
> mode, please"
> + " consider choosing %s mode.",
> stateBackend, RecoveryClaimMode.NO_CLAIM,
> RecoveryClaimMode.CLAIM));
> } else if (checkpointOptions.getCheckpointType().isSavepoint()) {
> SavepointType savepointType = (SavepointType)
> checkpointOptions.getCheckpointType();
> if
> (!stateBackend.supportsSavepointFormat(savepointType.getFormatType())) {
> throw new IllegalStateException(
> String.format(
> "Configured state backend (%s) does not
> support %s savepoints",
> stateBackend, savepointType.getFormatType()));
> }
> }
> } {code}
>
> The configuration for *{{CLAIM}}* is not even checked when throwing the
> exception.
> Reporter: Naci Simsek
> Priority: Major
> Attachments: flink_changelog_restore_logs.zip
>
>
> Start Flink deployment with below settings:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> execution.checkpointing.interval, 20s
> execution.checkpointing.externalized-checkpoint-retention,
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1 {code}
>
> Wait for couple checkpoints, and cancel the app.
> Then, start new flink app by resuming from either an existing checkpoint or
> savepoint, does NOT matter, with state backend CHANGELOG enabled as follows:
> {code:java}
> execution.savepoint-restore-mode, CLAIM
> state.backend.changelog.enabled, true
> execution.checkpointing.interval, 20s
> dstl.dfs.base-path, file:///flink/flink_binary_releases/flink-1.18.1/dstl
> state.backend.changelog.storage, filesystem
> execution.checkpointing.externalized-checkpoint-retention,
> RETAIN_ON_CANCELLATION
> execution.checkpointing.max-concurrent-checkpoints, 1{code}
> *Expected result:*
> As stated here in the Flink doc, app should successfully be restored and
> operate successfully.
> [https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/ops/state/state_backends/#upgrading-existing-jobs:~:text=Enabling%20Changelog-,Resuming%20from%20both%20savepoints%20and%20checkpoints%20is%20supported%3A,-given%20an%20existing]
>
> *Actual Result:*
> App at first, seemed to be restored successfully, till the first checkpoint
> it performs.
> As soon as a checkpoint is triggered, flink throws below exception:
>
> {noformat}
> 2025-08-30 13:22:56,176 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering
> checkpoint 16 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1756552976166 for job
> 356832174120c71afcaabdd5a46a60d9.
> 2025-08-30 13:22:56,185 DEBUG
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Taking a
> state snapshot on operator Source: Car data generator source for checkpoint 16
> 2025-08-30 13:22:56,219 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Car
> data generator source -> Timestamps/Watermarks (1/1)
> (202269b62aa7de30d8c9821514af0256_cbc357ccb763df2852fee8c4fc7d55f2_0_0)
> switched from RUNNING to FAILED on localhost:54887-436fc1 @ localhost
> (dataPort=54889).
> java.lang.Exception: Error while triggering checkpoint 16 for Source: Car
> data generator source -> Timestamps/Watermarks (1/1)#0
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1359)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:1023)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:?]
> at
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
> at
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> ~[flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> [flink-rpc-akkaabfd810a-5e47-49d8-9562-8f5f879b7244.jar:1.18.1]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> [?:?]
> at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [?:?]
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [?:?]
>
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> [?:?]Caused by: java.lang.IllegalStateException: Configured state backend
> (org.apache.flink.state.changelog.ChangelogStateBackend@10ae5aae) does not
> support enforcing a full snapshot. If you are restoring in NO_CLAIM mode,
> please consider choosing either CLAIM or LEGACY restore mode.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkForcedFullSnapshotSupport(StreamTask.java:1355)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsync(StreamTask.java:1138)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointNowAsync(SourceOperatorStreamTask.java:181)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask.triggerCheckpointAsync(SourceOperatorStreamTask.java:136)
> ~[flink-dist-1.18.1.jar:1.18.1]
> at
> org.apache.flink.runtime.taskmanager.Task.triggerCheckpointBarrier(Task.java:1335)
> ~[flink-dist-1.18.1.jar:1.18.1] ... 31 more{noformat}
> This exception makes it impossible to activate changelog state backend for
> the pipelines where state restore is essential.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)