xintongsong commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r508425784



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
##########
@@ -102,146 +95,73 @@
        /** The ZooKeeper client to use. */
        private final CuratorFramework client;
 
-       /** The executor to run ZooKeeper callbacks on. */
-       private final Executor executor;
-
-       /** The runtime configuration. */
-       private final Configuration configuration;
-
-       /** The zookeeper based running jobs registry. */
-       private final RunningJobsRegistry runningJobsRegistry;
-
-       /** Store for arbitrary blobs. */
-       private final BlobStoreService blobStoreService;
-
        public ZooKeeperHaServices(
                        CuratorFramework client,
                        Executor executor,
                        Configuration configuration,
                        BlobStoreService blobStoreService) {
+               super(executor, configuration, blobStoreService);
                this.client = checkNotNull(client);
-               this.executor = checkNotNull(executor);
-               this.configuration = checkNotNull(configuration);
-               this.runningJobsRegistry = new 
ZooKeeperRunningJobsRegistry(client, configuration);
-
-               this.blobStoreService = checkNotNull(blobStoreService);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Services
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, RESOURCE_MANAGER_LEADER_PATH);
-       }
-
-       @Override
-       public LeaderRetrievalService getDispatcherLeaderRetriever() {
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, DISPATCHER_LEADER_PATH);
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathForJobManager(jobID));
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+               return new ZooKeeperCheckpointRecoveryFactory(client, 
configuration, executor);
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultJobManagerAddress) {
-               return getJobManagerLeaderRetriever(jobID);
+       public JobGraphStore getJobGraphStore() throws Exception {
+               return ZooKeeperUtils.createJobGraphs(client, configuration);
        }
 
        @Override
-       public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
-               return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, REST_SERVER_LEADER_PATH);
+       public RunningJobsRegistry getRunningJobsRegistry() {
+               return new ZooKeeperRunningJobsRegistry(client, configuration);

Review comment:
       Not sure about this change.
   - This changes the behavior to create a new `ZooKeeperRunningJobsRegistry` 
every time.
   - This change does not belong to this commit, which is described "abstract 
common fields and methods"

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/KubeClientFactory.java
##########
@@ -71,7 +73,12 @@ public static FlinkKubeClient 
fromConfiguration(Configuration flinkConfig) {
 
                final KubernetesClient client = new 
DefaultKubernetesClient(config);
 
-               return new Fabric8FlinkKubeClient(flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+               if (flinkKubeClient == null) {
+                       flinkKubeClient = new Fabric8FlinkKubeClient(
+                               flinkConfig, client, 
KubeClientFactory::createThreadPoolForAsyncIO);
+               }

Review comment:
       I'm not sure about this change.
   - The argument `flinkConfig` is ignored when `flinkKubeClient` is not 
`null`. The factory should not assume the configuration is always same when 
calling this method. 
   - The codes before this `if` becomes useless when `flinkKubeClient` is not 
`null`.
   
   I would consider reusing of the `flinkKubeClient` an optimization. If it 
cannot be easily achieved, I'm also ok with using separate kube clients for 
different component, and make this optimization a follow-up.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the pattern 
"{clusterId}-{componentName}-leader". Given that the cluster
+ * id is configured to "k8s-ha-app1", then we could get the following 
ConfigMap names.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.leader-election.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.leader-election.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. The leader will " +

Review comment:
       "in ms" should be removed

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +106,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, a FlinkRuntimeException will be

Review comment:
       Might make sense to introduce some kind of `KubernetesException`, rather 
than throwing `FlinkRuntimeException`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;

Review comment:
       Could be `final`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the pattern 
"{clusterId}-{componentName}-leader". Given that the cluster
+ * id is configured to "k8s-ha-app1", then we could get the following 
ConfigMap names.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.leader-election.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.leader-election.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. The leader will " +
+                               "give up its leadership if it cannot 
successfully renew the lease in the given time.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+                       
key("high-availability.kubernetes.leader-election.retry-period")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(5))
+                       .withDescription("Defines the pause between consecutive 
retries in ms. All the contenders, including the " +

Review comment:
       "in ms" should be removed

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalService.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * This implementation of the {@link LeaderRetrievalService} retrieves the 
current leader which has
+ * been elected by the {@link 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * The leader address as well as the current leader session ID is retrieved 
from Kubernetes ConfigMap.
+ */
+class KubernetesLeaderRetrievalService implements LeaderRetrievalService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private final FlinkKubeClient kubeClient;
+
+       private final String configMapName;
+
+       @GuardedBy("lock")
+       private volatile String lastLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile UUID lastLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile LeaderRetrievalListener leaderListener;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private KubernetesWatch kubernetesWatch;
+
+       KubernetesLeaderRetrievalService(FlinkKubeClient kubeClient, String 
configMapName) {
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.configMapName = checkNotNull(configMapName, "ConfigMap 
name should not be null.");
+
+               this.leaderListener = null;
+               this.lastLeaderAddress = null;
+               this.lastLeaderSessionID = null;
+
+               running = false;
+       }
+
+       @Override
+       public void start(LeaderRetrievalListener listener) {
+               checkNotNull(listener, "Listener must not be null.");
+               Preconditions.checkState(leaderListener == null, 
"KubernetesLeaderRetrievalService can " +
+                       "only be started once.");
+
+               LOG.info("Starting {}.", this);
+
+               synchronized (lock) {
+                       running = true;
+                       leaderListener = listener;
+                       kubernetesWatch = 
kubeClient.watchConfigMaps(configMapName, new ConfigMapCallbackHandlerImpl());
+               }
+       }
+
+       @Override
+       public void stop() {
+               LOG.info("Stopping {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       if (kubernetesWatch != null) {
+                               kubernetesWatch.close();
+                       }
+               }
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderRetrievalService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       handleEvent(configMaps);
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       handleEvent(configMaps);
+               }
+
+               @Override
+               public void onDeleted(List<KubernetesConfigMap> configMaps) {
+                       // Nothing to do since a new ConfigMap will be created 
if it is deleted externally.
+               }
+
+               @Override
+               public void onError(List<KubernetesConfigMap> configMaps) {
+                       leaderListener.handleError(new Exception("Error while 
watching the ConfigMap " + configMapName));
+               }
+
+               @Override
+               public void handleFatalError(Throwable throwable) {
+                       leaderListener.handleError(
+                               new Exception("Fatal error while watching the 
ConfigMap " + configMapName, throwable));
+               }
+
+               private void handleEvent(List<KubernetesConfigMap> configMaps) {
+                       synchronized (lock) {
+                               if (running) {
+                                       configMaps.forEach(e -> {
+                                               if 
(e.getName().equals(configMapName)) {

Review comment:
       Should we add an assertion on the config map name?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the pattern 
"{clusterId}-{componentName}-leader". Given that the cluster
+ * id is configured to "k8s-ha-app1", then we could get the following 
ConfigMap names.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.leader-election.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +

Review comment:
       "in ms" should be removed

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;
+
+       KubernetesLeaderElectionService(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       KubernetesLeaderElectionConfiguration leaderConfig) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.executor = checkNotNull(executor, "Executor should not be 
null.");
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.leaderContender = null;
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+       }
+
+       @Override
+       public void internalStart(LeaderContender contender) {
+               CompletableFuture.runAsync(leaderElector::run, executor);
+               kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+       }
+
+       @Override
+       public void internalStop() {
+               if (kubernetesWatch != null) {
+                       kubernetesWatch.close();
+               }
+       }
+
+       @Override
+       protected void writeLeaderInformation() {
+               try {
+                       kubeClient.checkAndUpdateConfigMap(
+                               configMapName,
+                               configMap -> {
+                                       if 
(leaderElector.hasLeadership(configMap)) {
+                                               // Get the updated ConfigMap 
with new leader information
+                                               if (confirmedLeaderAddress != 
null && confirmedLeaderSessionID != null) {
+                                                       
configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+                                                       
configMap.getData().put(LEADER_SESSION_ID_KEY, 
confirmedLeaderSessionID.toString());
+                                               }
+                                               
configMap.getLabels().putAll(configMapLabels);
+                                               return Optional.of(configMap);
+                                       }
+                                       return Optional.empty();
+                               }).get();
+               } catch (Exception e) {
+                       leaderContender.handleError(new Exception("Could not 
update ConfigMap " + configMapName, e));
+               }
+       }
+
+       @Override
+       protected boolean hasLeadership() {
+               return kubeClient.getConfigMap(configMapName)
+                       .map(leaderElector::hasLeadership)
+                       .orElse(false);
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderElectionService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+               @Override
+               public void isLeader() {
+                       onGrantLeadership();
+               }
+
+               @Override
+               public void notLeader() {
+                       // Clear the leader information in ConfigMap
+                       try {
+                               kubeClient.checkAndUpdateConfigMap(
+                                       configMapName,
+                                       configMap -> {
+                                               // Do not need to check the 
leader here
+                                               
configMap.getData().remove(LEADER_ADDRESS_KEY);
+                                               
configMap.getData().remove(LEADER_SESSION_ID_KEY);
+                                               return Optional.of(configMap);
+                                       }
+                               ).get();
+                       } catch (Exception e) {
+                               leaderContender.handleError(
+                                       new Exception("Could not remove leader 
information from ConfigMap " + configMapName, e));
+                       }
+                       onRevokeLeadership();
+                       // Continue to contend the leader
+                       CompletableFuture.runAsync(leaderElector::run, 
executor);
+               }
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       // noop
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       if (hasLeadership()) {
+                               configMaps.forEach(configMap -> {
+                                       if 
(isLeaderUpdatedExternally(configMap)) {
+                                               if 
(configMap.getName().equals(configMapName)) {
+                                                       // the data field does 
not correspond to the expected leader information
+                                                       if 
(logger.isDebugEnabled()) {
+                                                               
logger.debug("Correcting leader information in {} by {}.",
+                                                                       
configMapName, leaderContender.getDescription());
+                                                       }
+                                                       
writeLeaderInformation();
+                                               } else {
+                                                       logger.warn("Ignoring 
the modified event since it does not belong to {}.", this);
+                                               }
+                                       }
+                               });
+                       }
+               }
+
+               @Override
+               public void onDeleted(List<KubernetesConfigMap> configMaps) {

Review comment:
       Should we also add an assertion on the confip map name for the delete 
events?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;
+
+       KubernetesLeaderElectionService(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       KubernetesLeaderElectionConfiguration leaderConfig) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.executor = checkNotNull(executor, "Executor should not be 
null.");
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.leaderContender = null;
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+       }
+
+       @Override
+       public void internalStart(LeaderContender contender) {
+               CompletableFuture.runAsync(leaderElector::run, executor);
+               kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+       }
+
+       @Override
+       public void internalStop() {
+               if (kubernetesWatch != null) {
+                       kubernetesWatch.close();
+               }
+       }
+
+       @Override
+       protected void writeLeaderInformation() {
+               try {
+                       kubeClient.checkAndUpdateConfigMap(
+                               configMapName,
+                               configMap -> {
+                                       if 
(leaderElector.hasLeadership(configMap)) {
+                                               // Get the updated ConfigMap 
with new leader information
+                                               if (confirmedLeaderAddress != 
null && confirmedLeaderSessionID != null) {
+                                                       
configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+                                                       
configMap.getData().put(LEADER_SESSION_ID_KEY, 
confirmedLeaderSessionID.toString());
+                                               }
+                                               
configMap.getLabels().putAll(configMapLabels);
+                                               return Optional.of(configMap);
+                                       }
+                                       return Optional.empty();
+                               }).get();
+               } catch (Exception e) {
+                       leaderContender.handleError(new Exception("Could not 
update ConfigMap " + configMapName, e));
+               }
+       }
+
+       @Override
+       protected boolean hasLeadership() {
+               return kubeClient.getConfigMap(configMapName)
+                       .map(leaderElector::hasLeadership)
+                       .orElse(false);
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderElectionService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+               @Override
+               public void isLeader() {
+                       onGrantLeadership();
+               }
+
+               @Override
+               public void notLeader() {
+                       // Clear the leader information in ConfigMap
+                       try {
+                               kubeClient.checkAndUpdateConfigMap(
+                                       configMapName,
+                                       configMap -> {
+                                               // Do not need to check the 
leader here
+                                               
configMap.getData().remove(LEADER_ADDRESS_KEY);
+                                               
configMap.getData().remove(LEADER_SESSION_ID_KEY);
+                                               return Optional.of(configMap);
+                                       }
+                               ).get();
+                       } catch (Exception e) {
+                               leaderContender.handleError(
+                                       new Exception("Could not remove leader 
information from ConfigMap " + configMapName, e));
+                       }
+                       onRevokeLeadership();
+                       // Continue to contend the leader
+                       CompletableFuture.runAsync(leaderElector::run, 
executor);
+               }
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       // noop
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       if (hasLeadership()) {
+                               configMaps.forEach(configMap -> {
+                                       if 
(isLeaderUpdatedExternally(configMap)) {
+                                               if 
(configMap.getName().equals(configMapName)) {

Review comment:
       Shouldn't we check the config map name before checking whether it is 
updated externally?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.UUID;
+
+/**
+ * Represent {@link KubernetesLeaderElector} in kubernetes. {@link 
LeaderElector#run()} is a blocking call. It should be
+ *  run in the IO executor, not the main thread. The lifecycle is bound to 
single leader election. Once the leadership
+ * is revoked, as well as the {@link LeaderCallbackHandler#notLeader()} is 
called, the {@link LeaderElector#run()} will
+ * finish. To start another round of election, we need to trigger again.
+ */
+public class KubernetesLeaderElector extends 
LeaderElector<NamespacedKubernetesClient> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderElector.class);
+       protected static final String LOCK_IDENTITY = 
UUID.randomUUID().toString();
+       protected static final String LEADER_ANNOTATION_KEY = 
"control-plane.alpha.kubernetes.io/leader";

Review comment:
       Should annotate with `@VisibleForTesting`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;
+
+       KubernetesLeaderElectionService(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       KubernetesLeaderElectionConfiguration leaderConfig) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.executor = checkNotNull(executor, "Executor should not be 
null.");
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.leaderContender = null;
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+       }
+
+       @Override
+       public void internalStart(LeaderContender contender) {
+               CompletableFuture.runAsync(leaderElector::run, executor);
+               kubernetesWatch = kubeClient.watchConfigMaps(configMapName, new 
ConfigMapCallbackHandlerImpl());
+       }
+
+       @Override
+       public void internalStop() {
+               if (kubernetesWatch != null) {
+                       kubernetesWatch.close();
+               }
+       }
+
+       @Override
+       protected void writeLeaderInformation() {
+               try {
+                       kubeClient.checkAndUpdateConfigMap(
+                               configMapName,
+                               configMap -> {
+                                       if 
(leaderElector.hasLeadership(configMap)) {
+                                               // Get the updated ConfigMap 
with new leader information
+                                               if (confirmedLeaderAddress != 
null && confirmedLeaderSessionID != null) {
+                                                       
configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+                                                       
configMap.getData().put(LEADER_SESSION_ID_KEY, 
confirmedLeaderSessionID.toString());
+                                               }
+                                               
configMap.getLabels().putAll(configMapLabels);
+                                               return Optional.of(configMap);
+                                       }
+                                       return Optional.empty();
+                               }).get();
+               } catch (Exception e) {
+                       leaderContender.handleError(new Exception("Could not 
update ConfigMap " + configMapName, e));
+               }
+       }
+
+       @Override
+       protected boolean hasLeadership() {
+               return kubeClient.getConfigMap(configMapName)
+                       .map(leaderElector::hasLeadership)
+                       .orElse(false);
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderElectionService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+               @Override
+               public void isLeader() {
+                       onGrantLeadership();
+               }
+
+               @Override
+               public void notLeader() {
+                       // Clear the leader information in ConfigMap
+                       try {
+                               kubeClient.checkAndUpdateConfigMap(
+                                       configMapName,
+                                       configMap -> {
+                                               // Do not need to check the 
leader here
+                                               
configMap.getData().remove(LEADER_ADDRESS_KEY);
+                                               
configMap.getData().remove(LEADER_SESSION_ID_KEY);
+                                               return Optional.of(configMap);
+                                       }
+                               ).get();
+                       } catch (Exception e) {
+                               leaderContender.handleError(
+                                       new Exception("Could not remove leader 
information from ConfigMap " + configMapName, e));
+                       }
+                       onRevokeLeadership();
+                       // Continue to contend the leader
+                       CompletableFuture.runAsync(leaderElector::run, 
executor);
+               }
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       // noop
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       if (hasLeadership()) {
+                               configMaps.forEach(configMap -> {
+                                       if 
(isLeaderUpdatedExternally(configMap)) {
+                                               if 
(configMap.getName().equals(configMapName)) {

Review comment:
       Why would we receive events for config map with another name? Shouldn't 
we only received events for the config map that we watch?
   Maybe we should add an assertion for the config map name here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to