[ 
https://issues.apache.org/jira/browse/FLINK-20648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17253302#comment-17253302
 ] 

Xintong Song commented on FLINK-20648:
--------------------------------------

Just to add my two cents.

IIUC, the reason we convert a savepoint into a checkpoint, is to reuse the 
checkpoint recovery logics for restoring jobs from savepoints. For that 
purpose, it is not necessary to write this savepoint-converted checkpoint to 
the external HA service, because if this checkpoint gets lost the job can 
simply recover from the original savepoint. If we don't need to write this 
checkpoint to the external HA service, there's no need to wait for the 
leadership.

Therefore, we might consider to extend {{CompletedCheckpointStore}} with 
{{addCheckpointLocally}} and {{StatehandleStore}} with {{addLocally}} for the 
savepoint-converted checkpoint. We can keep the local checkpoints in memory for 
{{KubernetesStateHandleStore}}, while keep the original implementation for 
{{ZooKeeperStateHandleStore}}. For recovery, we would need to compare the local 
checkpoints (if any) with the remote ones retrieved from external HA service, 
and recover from the most recent one.

Ideally, this should bring no behavior changes to the ZooKeeper HA service, 
while allow restoring jobs from savepoints with K8s HA service. It might 
complicate the HA service interfaces a bit, but should be less invasive/risky 
compared to changing job master start up logics.

I'm proposing this as a potential quick fix for the 1.12 branch. I agree that 
FLINK-11719 is our first choice for resolving the problem on the master branch.

[~trohrmann] [~fly_in_gis] WDYT?

> Unable to restore job from savepoint when using Kubernetes based HA services
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-20648
>                 URL: https://issues.apache.org/jira/browse/FLINK-20648
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes
>    Affects Versions: 1.12.0
>            Reporter: David Morávek
>            Assignee: Yang Wang
>            Priority: Blocker
>             Fix For: 1.13.0, 1.12.1
>
>
> When restoring job from savepoint, we always end up with following error:
> {code}
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not instantiate JobManager.
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:463)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Stopped 
> retrying the operation because the error is not retryable.
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2063) ~[?:?]
>       at 
> org.apache.flink.kubernetes.highavailability.KubernetesStateHandleStore.addAndLock(KubernetesStateHandleStore.java:150)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:211)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1479)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:325)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:266)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:238)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:108)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:323)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:310) 
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:96)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:41)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:141)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:80)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:450)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Stopped retrying the operation because the error is not retryable.
>       at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperation$1(FutureUtils.java:166)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>  ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
>  ~[?:?]
>       ... 3 more
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
>       ... 3 more
> Caused by: 
> org.apache.flink.kubernetes.kubeclient.resources.KubernetesException: Cannot 
> retry checkAndUpdateConfigMap with configMap 
> pipelines-runner-fulltext-6e99e672-4af29f0768624632839835717898b08d-jobmanager-leader
>  because it does not exist.
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$6(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at java.util.Optional.orElseThrow(Optional.java:401) ~[?:?]
>       at 
> org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.lambda$null$7(Fabric8FlinkKubeClient.java:289)
>  ~[flink-dist_2.11-1.12.0.jar:1.12.0]
>       at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1764)
>  ~[?:?]
> {code}
> Cause of the issue is following:
> - We construct `jobMasterServices` prior starting `leaderElectionService` (in 
> `JobManagerRunnerImpl`)
> - During `jobMasterServices` initialization 
> `tryRestoreExecutionGraphFromSavepoint` gets called. This calls 
> `KubernetesStateHandleStore.addAndLock` interally.
> - `KubernetesStateHandleStore.addAndLock` expects configmap for JM leadership 
> to be already present, which is wrong, because `leaderElectionService` which 
> is responsible for its creation has not started yet
> Possible fixes:
> - Start `leaderElectionService` before `jobMasterServices`
> - Fix `KubernetesStateHandleStore`, so it can handle the case, when leader 
> hasn't been elected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to