[
https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17459399#comment-17459399
]
Kerem Ulutaş edited comment on FLINK-25267 at 12/14/21, 6:40 PM:
-----------------------------------------------------------------
[~dwysakowicz] and [~trohrmann] - actually I am running in a minikube
environment, I am testing with only 1 job running and I basically reset the
environment between my trials - which is, deleting Zookeeper nodes (or
Kubernetes configmaps if it is a Kubernetes HA test) and recovery data
(state.checkpoints.dir, high-availability.storageDir) stored on HDFS. So I make
sure that no previous runs' related data or configmap or Zookeeper node exist
when I run the job. I also tried deleting the minikube vm, re-creating it and
running the job with high availability settings, result was the same. (HDFS,
Zookeeper, etc. are all running on minikube btw)
I also noticed the constant JobID, wondering why it is constant when I enable
high availability. I've deployed my application as described
[here|https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/modules/embedded/#deployment]
- when I enable high availability, be it zookeeper or kubernetes way, then the
JobID is assigned as {{00000000000000000000000000000000}} and when it is
disabled then a randomized id is assigned to the job. Do you think this is
telling something wrong with my setup?
was (Author: keremulutas):
[~dwysakowicz] and [~trohrmann] - actually I am running in a minikube
environment, I am testing with only 1 job running and I basically reset the
environment between my trials - which is, deleting Zookeeper nodes (or
Kubernetes configmaps if it is a Kubernetes HA test) and recovery data
(state.checkpoints.dir, high-availability.storageDir) stored on HDFS. So I make
sure that no previous runs' related data or configmap or Zookeeper node exist
when I run the job. I also tried deleting the minikube vm, re-creating it and
running the job with high availability settings, result was the same. (HDFS,
Zookeeper, etc. are all running on minikube btw)
I also noticed the constant JobID, wondering why it is constant when I enable
high availability. I've deployed my application as described
[here|https://nightlies.apache.org/flink/flink-statefun-docs-release-3.1/docs/modules/embedded/#deployment]
- when I enable high availability, be it zookeeper or kubernetes way, then the
JobID is assigned as {{00000000000000000000000000000000 }}and when it is
disabled then a randomized id is assigned to the job. Do you think this is
telling something wrong with my setup?
> Unable to (always) recover using checkpoint in HA setup (both Zookeeper and
> Kubernetes)
> ---------------------------------------------------------------------------------------
>
> Key: FLINK-25267
> URL: https://issues.apache.org/jira/browse/FLINK-25267
> Project: Flink
> Issue Type: Bug
> Components: Deployment / Kubernetes, Runtime / Checkpointing,
> Stateful Functions
> Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2
> Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful
> Functions 3.0.0 and Stateful Functions 3.1.0
> Reporter: Kerem Ulutaş
> Priority: Major
>
> My Stateful Functions job is running on Kubernetes (minikube on my local env)
> and has these settings:
> * Using StateFun v3.1.0
> * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
> * Checkpointing mode is EXACTLY_ONCE
> * State backend is rocksdb and incremental checkpointing is enabled
> When I kill the jobmanager (master) pod, minikube starts another pod and this
> new pod fails when it tries to load last checkpoint:
> {code:java}
> ...
> 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Initializing job myStatefunApp
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using restart back off time strategy
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
> backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
> 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Initialized DefaultCompletedCheckpointStore in
> 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
> with /checkpoints/00000000000000000000000000000000.
> 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Running initialization on master for job myStatefunApp
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Successfully ran initialization on master in 0 ms.
> 2021-12-11 14:25:26,617 INFO
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
> Built 1 pipelined regions in 1 ms
> 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using job/cluster config to configure application-defined
> state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,627 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using predefined options: DEFAULT.
> 2021-12-11 14:25:26,627 INFO
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] -
> Using application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
> 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Using application-defined state backend:
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null,
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1,
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster
> [] - Checkpoint storage is set to 'filesystem': (checkpoints
> "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
> 2021-12-11 14:25:26,712 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Recovering checkpoints from
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,724 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Found 1 checkpoints in
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,725 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to fetch 1 checkpoints from storage.
> 2021-12-11 14:25:26,725 INFO
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] -
> Trying to retrieve checkpoint 2.
> 2021-12-11 14:25:26,931 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring
> job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for
> 00000000000000000000000000000000 located at
> hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
> 2021-12-11 14:25:27,012 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job
> 00000000000000000000000000000000 failed.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source)
> ~[?:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could
> not start the JobMaster.
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
> ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: There is no operator for the state
> 18666b435c78ee2416e74bb997b798a7
> at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ~[?:?]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: There is no operator for the
> state 18666b435c78ee2416e74bb997b798a7
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
> ~[?:?]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> ~[?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
> ~[?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ~[?:?]
> at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-12-11 14:25:27,017 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
> StatefulFunctionsClusterEntryPoint down with application status UNKNOWN.
> Diagnostics Cluster entrypoint has been closed externally..
> 2021-12-11 14:25:27,021 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
> 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer
> [] - Stopped BLOB server at 0.0.0.0:6124
> 2021-12-11 14:25:27,034 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing
> cache directory
> /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
> 2021-12-11 14:25:27,035 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,035 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
> 2021-12-11 14:25:27,036 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down
> complete.
> 2021-12-11 14:25:27,036 INFO
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
> [] - Closing components.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
> Closing
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] -
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] -
> Closing
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
> 2021-12-11 14:25:27,038 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,038 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
> 2021-12-11 14:25:27,039 INFO
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] -
> Stopping JobDispatcherLeaderProcess.
> 2021-12-11 14:25:27,040 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Closing the slot manager.
> 2021-12-11 14:25:27,040 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Suspending the slot manager.
> 2021-12-11 14:25:27,041 INFO
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] -
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,041 INFO
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] -
> Closing
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'}
> {code}
>
> But somehow, among several restarts, jobmanager can randomly restore job from
> the last checkpoint. After I changed log level of Flink to DEBUG, I've
> managed to get the difference between an unsuccessful (resulting in above
> log) and a successful sequence of events. It seems that operators can get
> assigned different hashes between restarts, here is the relevant log section
> for the unsucessful assignment (renamed my operators for clarity):
>
> {code:java}
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5'
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4'
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6'
> {id: 6, parallelism: 1, user function: } {code}
> .. and here is the same log section for the successful assignment:
> {code:java}
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5'
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4'
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated
> hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6'
> {id: 6, parallelism: 1, user function: } {code}
> As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and
> jobmanager could match the operator for the state loaded from the checkpoint
> and it could continue normal operation. Another thing to note is, router
> operators have different ids assigned between the 2 runs.
>
> I took a look at StreamGraphHasherV2 code
> ([link|https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65])
> there is an explicit attempt to have the operator order the same between
> different runs, however my Stateful Functions application seems to be able to
> avoid that attempt.
>
> Since we can't assign operator ids when using Stateful Functions, is there
> anything I can do right to get it working correctly? Is this a bug, or am I
> trying it with a wrong combination of settings or something like that?
>
> As a last note, I've also posted the same earlier to Stack Overflow, here is
> the
> [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
> to the question.
> Thanks
--
This message was sent by Atlassian Jira
(v8.20.1#820001)