[
https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kerem Ulutaş updated FLINK-25267:
---------------------------------
Description:
My Stateful Functions job is running on Kubernetes (minikube on my local env)
and has these settings:
* Using StateFun v3.1.0
* Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
* Checkpointing mode is EXACTLY_ONCE
* State backend is rocksdb and incremental checkpointing is enabled
When I kill the jobmanager (master) pod, minikube starts another pod and this
new pod fails when it tries to load last checkpoint:
{code:java}
...
2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils
[] - Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using job/cluster config to configure application-defined state
backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Checkpoint storage is set to 'filesystem': (checkpoints
"hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found
1 checkpoints in
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for
00000000000000000000000000000000 located at
hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
00000000000000000000000000000000 failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StatefulFunctionsClusterEntryPoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
down rest endpoint.
2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
complete.
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
[] - Closing components.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Closing the slot manager.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Suspending the slot manager.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code}
But somehow, among several restarts, jobmanager can randomly restore job from
the last checkpoint. After I changed log level of Flink to DEBUG, I've managed
to get the difference between an unsuccessful (resulting in above log) and a
successful sequence of events. It seems that operators can get assigned
different hashes between restarts, here is the relevant log section for the
unsucessful assignment (renamed my operators for clarity):
{code:java}
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6'
{id: 6, parallelism: 1, user function: } {code}
.. and here is the same log section for the successful assignment:
{code:java}
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6'
{id: 6, parallelism: 1, user function: } {code}
As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and
jobmanager could match the operator for the state loaded from the checkpoint
and it could continue normal operation. Another thing to note is, router
operators have different ids assigned between the 2 runs.
I took a look at StreamGraphHasherV2 code
([link|https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65])
there is an explicit attempt to have the operator order the same between
different attempts, however my Stateful Functions application seems to be able
to avoid that attempt.
Since we can't assign operator ids when using Stateful Functions, is there
anything I can do right to get it working correctly? Is this a bug, or am I
trying it with a wrong combination of settings or something like that?
As a last note, I've also posted the same earlier to Stack Overflow, here is
the
[link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
to the question.
Thanks
was:
My Stateful Functions job is running on Kubernetes (minikube on my local env)
and has these settings:
* Using StateFun v3.1.0
* Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
* Checkpointing mode is EXACTLY_ONCE
* State backend is rocksdb and incremental checkpointing is enabled
When I kill the jobmanager (master) pod, minikube starts another pod and this
new pod fails when it tries to load last checkpoint:
{code:java}
...
2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils
[] - Initialized DefaultCompletedCheckpointStore in
'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on master for job myStatefunApp
(00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built
1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using job/cluster config to configure application-defined state
backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using
application-defined options factory:
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Using application-defined state backend:
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Checkpoint storage is set to 'filesystem': (checkpoints
"hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
Recovering checkpoints from
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found
1 checkpoints in
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying
to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job
00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for
00000000000000000000000000000000 located at
hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job
00000000000000000000000000000000 failed.
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
~[?:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive(Actor.scala:517)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.12-1.13.2.jar:1.13.2]
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
not start the JobMaster.
at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
~[?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state
18666b435c78ee2416e74bb997b798a7
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.12-1.13.2.jar:1.13.2]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StatefulFunctionsClusterEntryPoint down with application status UNKNOWN.
Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
down rest endpoint.
2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer
[] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
complete.
2021-12-11 14:25:27,036 INFO
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
[] - Closing components.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
Closing
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Closing the slot manager.
2021-12-11 14:25:27,040 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager []
- Suspending the slot manager.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
Closing
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code}
But somehow, among several restarts, jobmanager can randomly restore job from
the last checkpoint. After I changed log level of Flink to DEBUG, I've managed
to get the difference between an unsuccessful (resulting in above log) and a
successful sequence of events. It seems that operators can get assigned
different hashes between restarts, here is the relevant log section for the
unsucessful assignment (renamed my operators for clarity):
{code:java}
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6'
{id: 6, parallelism: 1, user function: } {code}
.. and here is the same log section for the successful assignment:
{code:java}
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5'
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4'
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6'
{id: 6, parallelism: 1, user function: } {code}
As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and
jobmanager could match the operator for the state loaded from the checkpoint
and it could continue normal operation. Another thing to note is, router
operators have different ids assigned between the 2 runs.
I took a look at StreamGraphHasherV2 code ([link|#L65]) there is an explicit
attempt to have the operator order the same between different attempts, however
my Stateful Functions application seems to be able to avoid that attempt.
Since we can't assign operator ids when using Stateful Functions, is there
anything I can do right to get it working correctly? Is this a bug, or am I
trying it with a wrong combination of settings or something like that?
As a last note, I've also posted the same earlier to Stack Overflow, here is
the
[link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
to the question.
Thanks
> Unable to (always) recover using checkpoint in HA setup (both Zookeeper and
> Kubernetes)
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-25267
> URL: https://issues.apache.org/jira/browse/FLINK-25267
> Project: Flink
> Issue Type: Bug
> Components: Deployment / Kubernetes, Stateful Functions
> Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2
> Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful
> Functions 3.0.0 and Stateful Functions 3.1.0
> Reporter: Kerem Ulutaş
> Priority: Major
>
> My Stateful Functions job is running on Kubernetes (minikube on my local env)
> and has these settings:
> * Using StateFun v3.1.0
> * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
> * Checkpointing mode is EXACTLY_ONCE
> * State backend is rocksdb and incremental checkpointing is enabled
> When I kill the jobmanager (master) pod, minikube starts another pod and this
> new pod fails when it tries to load last checkpoint:
> {code:java}
> ...
> 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Initializing job myStatefunApp
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using restart back off time strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
> 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Initialized DefaultCompletedCheckpointStore in
> 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
> with /checkpoints/00000000000000000000000000000000.
> 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Running initialization on master for job myStatefunApp
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Successfully ran initialization on master in 0 ms.
> 2021-12-11 14:25:26,617 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 1 ms
> 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using job/cluster config to configure application-defined
> state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,627 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using predefined options: DEFAULT.
> 2021-12-11 14:25:26,627 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
> 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using application-defined state backend:
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Checkpoint storage is set to 'filesystem': (checkpoints
> "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
> 2021-12-11 14:25:26,712 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Recovering checkpoints from
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,724 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Found 1 checkpoints in
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,725 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to fetch 1 checkpoints from storage.
> 2021-12-11 14:25:26,725 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to retrieve checkpoint 2.
> 2021-12-11 14:25:26,931 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring
> job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for
> 00000000000000000000000000000000 located at
> hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
> 2021-12-11 14:25:27,012 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job
> 00000000000000000000000000000000 failed.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
> ~[?:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
> not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: There is no operator for the state
> 18666b435c78ee2416e74bb997b798a7
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: There is no operator for the
> state 18666b435c78ee2416e74bb997b798a7
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-12-11 14:25:27,017 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> StatefulFunctionsClusterEntryPoint down with application status UNKNOWN.
> Diagnostics Cluster entrypoint has been closed externally..
> 2021-12-11 14:25:27,021 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
> 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer
> [] - Stopped BLOB server at 0.0.0.0:6124
> 2021-12-11 14:25:27,034 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
> cache directory
> /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
> 2021-12-11 14:25:27,035 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,035 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
> 2021-12-11 14:25:27,036 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
> complete.
> 2021-12-11 14:25:27,036 INFO
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> [] - Closing components.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
> Closing
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
> Closing
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
> 2021-12-11 14:25:27,038 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,038 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
> 2021-12-11 14:25:27,039 INFO
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
> Stopping JobDispatcherLeaderProcess.
> 2021-12-11 14:25:27,040 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Closing the slot manager.
> 2021-12-11 14:25:27,040 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Suspending the slot manager.
> 2021-12-11 14:25:27,041 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,041 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}
> {code}
>
> But somehow, among several restarts, jobmanager can randomly restore job from
> the last checkpoint. After I changed log level of Flink to DEBUG, I've
> managed to get the difference between an unsuccessful (resulting in above
> log) and a successful sequence of events. It seems that operators can get
> assigned different hashes between restarts, here is the relevant log section
> for the unsucessful assignment (renamed my operators for clarity):
>
> {code:java}
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5'
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4'
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6'
> {id: 6, parallelism: 1, user function: } {code}
> .. and here is the same log section for the successful assignment:
> {code:java}
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5'
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4'
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6'
> {id: 6, parallelism: 1, user function: } {code}
> As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and
> jobmanager could match the operator for the state loaded from the checkpoint
> and it could continue normal operation. Another thing to note is, router
> operators have different ids assigned between the 2 runs.
>
> I took a look at StreamGraphHasherV2 code
> ([link|https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65])
> there is an explicit attempt to have the operator order the same between
> different attempts, however my Stateful Functions application seems to be
> able to avoid that attempt.
>
> Since we can't assign operator ids when using Stateful Functions, is there
> anything I can do right to get it working correctly? Is this a bug, or am I
> trying it with a wrong combination of settings or something like that?
>
> As a last note, I've also posted the same earlier to Stack Overflow, here is
> the
> [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
> to the question.
> Thanks
--
This message was sent by Atlassian Jira
(v8.20.1#820001)