[ 
https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17251523#comment-17251523
 ] 

Yang Wang commented on FLINK-20648:
-----------------------------------

[~dmvk] Thanks for creating this issue and debugging the root cause. I think 
you are right. Currently, when recovering from savepoint, Flink will add a new 
checkpoint to the HA storage. So it needs to update the ConfigMap. However, the 
ConfigMap has not be created since the leader election service is not started.

 

In the Kubernetes HA implementation, we have a very important assumption, only 
the active leader could update the HA ConfigMap. I do not tend to let 
{{KubernetesStateHandleStore}} could support adding checkpoints before leader 
is granted. It will cause some issues when we have multiple JobManagers. But I 
am also not sure whether we could start the leader election service before 
jobmaster service. I will dig more and hope to find a more reasonable solution.

> Unable to restore job from savepoint when using Kubernetes based HA services
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20648
>                 URL: https://issues.apache.org/jira/browse/FLINK-20648
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.0
>            Reporter: David Morávek
>            Assignee: Yang Wang
>            Priority: Critical
>             Fix For: 1.12.1
>
>
> When restoring job from savepoint, we always end up with following error:
> {code}
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not instantiate JobManager.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped 
> retrying the operation because the error is not retryable.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?]
>       at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Stopped retrying the operation because the error is not retryable.
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
> {code}
> Cause of the issue is following:
> - We construct `jobMasterServices` prior starting `leaderElectionService` (in 
> `JobManagerRunnerImpl`)
> - During `jobMasterServices` initialization 
> `tryRestoreExecutionGraphFromSavepoint` gets called. This calls 
> `KubernetesStateHandleStore.addAndLock` interally.
> - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership 
> to be already present, which is wrong, because `leaderElectionService` which 
> is responsible for its creation has not started yet
> Possible fixes:
> - Start `leaderElectionService` before `jobMasterServices`
> - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader 
> hasn't been elected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to