Kerem Ulutaş created FLINK-25267:
------------------------------------
Summary: Unable to (always) recover using checkpoint in HA setup
(both Zookeeper and Kubernetes)
Key: FLINK-25267
URL: https://issues.apache.org/jira/browse/FLINK-25267
Project: Flink
Issue Type: Bug
Components: Deployment / Kubernetes, Stateful Functions
Affects Versions: 1.13.2, statefun-3.0.0, 1.12.1, statefun-3.1.0
Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful
Functions 3.0.0 and Stateful Functions 3.1.0
Reporter: Kerem Ulutaş
My Stateful Functions job is running on Kubernetes (minikube on my local env)
and has these settings:
* Using StateFun v3.1.0
* Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
* Checkpointing mode is EXACTLY_ONCE
* State backend is rocksdb and incremental checkpointing is enabled
When I kill the jobmanager (master) pod, minikube starts another pod and this
new pod fails when it tries to load last checkpoint:
{code:java}
...
2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils
[] - Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using job/cluster config to configure application-defined state
backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Checkpoint storage is set to 'filesystem': (checkpoints
"hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found
1 checkpoints in
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for
00000000000000000000000000000000 located at
hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
00000000000000000000000000000000 failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StatefulFunctionsClusterEntryPoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
down rest endpoint.
2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
complete.
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
[] - Closing components.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Closing the slot manager.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Suspending the slot manager.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code}
But somehow, among several restarts, jobmanager can randomly restore job from
the last checkpoint. After I changed log level of Flink to DEBUG, I've managed
to get the difference between an unsuccessful (resulting in above log) and a
successful sequence of events. It seems that operators can get assigned
different hashes between restarts, here is the relevant log section for the
unsucessful assignment (renamed my operators for clarity):
{code:java}
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6'
{id: 6, parallelism: 1, user function: } {code}
.. and here is the same log section for the successful assignment:
{code:java}
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6'
{id: 6, parallelism: 1, user function: } {code}
As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and
jobmanager could match the operator for the state loaded from the checkpoint
and it could continue normal operation. Another thing to note is, router
operators have different ids assigned between the 2 runs.
I took a look at StreamGraphHasherv2 code
([link|[https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65]),]
there is an explicit attempt to have the operator order the same between
different attempts, however my Stateful Functions application seems to be able
to avoid that attempt.
Since we can't assign operator ids when using Stateful Functions, is there
anything I can do right to get it working correctly? Is this a bug, or am I
trying it with a wrong combination of settings or something like that?
As a last note, I've also posted the same earlier to Stack Overflow, here is
the
[link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
to the question.
Thanks
--
This message was sent by Atlassian Jira
(v8.20.1#820001)