xintongsong commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r505312805



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watcher for resources in Kubernetes.
+ */
+public abstract class KubernetesWatcher<T extends HasMetadata, K extends 
KubernetesResource<T>> implements Watcher<T> {

Review comment:
       Let's rename this class to `AbstractKubernetesWatcher`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
##########
@@ -258,6 +258,13 @@
                        .withDescription("If configured, Flink will add 
\"resources.limits.<config-key>\" and \"resources.requests.<config-key>\" " +
                                "to the main container of TaskExecutor and set 
the value to the value of " + 
ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT.key() + ".");
 
+       public static final ConfigOption<Integer> KUBERNETES_MAX_RETRY_ATTEMPTS 
=
+               key("kubernetes.client.max-retry-attempts")
+                       .intType()
+                       .defaultValue(5)
+                       .withDescription("Defines the number of Kubernetes 
resources update operation retries before the client " +
+                               "gives up. For example, updating the 
ConfigMap.");
+

Review comment:
       I think "Kubernetes resources update operation" is a bit too general. 
E.g., one could argue that creating a new pod is also a "Kubernetes resources 
update operation".
   
   I would suggest `kubernetes.transactional-operation.max-retries` as the 
configuration key, and explain what is a transactional operation (a group of 
operations that are guaranteed atomic) in the description.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapWatcher.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.Collections;
+
+/**
+ * Watcher for ConfigMaps in Kubernetes.
+ */
+public class KubernetesConfigMapWatcher extends KubernetesWatcher<ConfigMap, 
KubernetesConfigMap> {
+
+       public 
KubernetesConfigMapWatcher(FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>
 callbackHandler) {
+               super(callbackHandler);
+       }
+
+       @Override
+       public void eventReceived(Action action, ConfigMap configMap) {
+               logger.debug("Received {} event for configMap {}, details: {}",
+                       action, configMap.getMetadata().getName(), 
configMap.getData());
+               switch (action) {
+                       case ADDED:
+                               
callbackHandler.onAdded(Collections.singletonList(new 
KubernetesConfigMap(configMap)));
+                               break;
+                       case MODIFIED:
+                               
callbackHandler.onModified(Collections.singletonList(new 
KubernetesConfigMap(configMap)));
+                               break;
+                       case ERROR:
+                               
callbackHandler.onError(Collections.singletonList(new 
KubernetesConfigMap(configMap)));
+                               break;
+                       case DELETED:
+                               
callbackHandler.onDeleted(Collections.singletonList(new 
KubernetesConfigMap(configMap)));
+                               break;

Review comment:
       Minor: we can deduplicate the codes by generating the singleton list 
before `switch`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMapWatcher.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.Collections;
+
+/**
+ * Watcher for ConfigMaps in Kubernetes.

Review comment:
       ```suggestion
    * Watcher for {@link ConfigMap}s in Kubernetes.
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesConfigMap.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represent KubernetesConfigMap resource in kubernetes.

Review comment:
       ```suggestion
    * Represent {@link ConfigMap} resource in kubernetes.
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, nothing will happen.
+        *
+        * @param configMap ConfigMap.
+        *
+        * @return Return the ConfigMap create future.
+        */
+       CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+       /**
+        * Get the ConfigMap with specified name.
+        *
+        * @param name ConfigMap name.
+        *
+        * @return Return empty if the ConfigMap does not exist.

Review comment:
       ```suggestion
         * @return Return the ConfigMap, or empty if the ConfigMap does not 
exist.
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, nothing will happen.
+        *
+        * @param configMap ConfigMap.
+        *
+        * @return Return the ConfigMap create future.
+        */
+       CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);

Review comment:
       I would suggest naming this method `createConfigMapIfAbsent`, to make it 
explicit that nothing will happen if the config map already exist.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               return CompletableFuture.runAsync(
+                       () -> {
+                               if 
(!getConfigMap(configMap.getName()).isPresent()) {
+                                       
this.internalClient.configMaps().create(configMap.getInternalResource());
+                               }
+                       },
+                       kubeClientExecutorService);

Review comment:
       Not related to this PR, but I think we can already replace 
`kubeClientExecutorService` with `AbstractResourceManagerDriver#ioExecutor`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, nothing will happen.
+        *
+        * @param configMap ConfigMap.
+        *
+        * @return Return the ConfigMap create future.
+        */
+       CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+       /**
+        * Get the ConfigMap with specified name.
+        *
+        * @param name ConfigMap name.
+        *
+        * @return Return empty if the ConfigMap does not exist.
+        */
+       Optional<KubernetesConfigMap> getConfigMap(String name);
+
+       /**
+        * Update an existing ConfigMap with the data.
+        *
+        * @param configMapName ConfigMap to be replaced with. Benefit from <a 
href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+        *                      resource version</a> and combined with {@link 
#getConfigMap(String)}, we could perform a get-check-and-update
+        *                      transactional operation. Since concurrent 
modification could happen on a same ConfigMap,
+        *                      the update operation may fail. We need to retry 
internally. The max retry attempts could be
+        *                      configured via {@link 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+        * @param checker       Only the checker return true, the ConfigMap 
will be updated.
+        * @param function      The obtained ConfigMap will be applied to this 
function and get a new one to replace.
+        *
+        * @return Return the ConfigMap update future.
+        */
+       CompletableFuture<Boolean> checkAndUpdateConfigMap(
+               String configMapName,
+               Predicate<KubernetesConfigMap> checker,
+               FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, 
?> function);

Review comment:
       I think we can get rid of the argument `checker`, and make `function` 
returns a `Optional<KubernetesConfigMap>` to indicate whether and how to update 
the config map. This should simplify the interface and make its contract easy 
to understand.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, nothing will happen.
+        *
+        * @param configMap ConfigMap.
+        *
+        * @return Return the ConfigMap create future.
+        */
+       CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+       /**
+        * Get the ConfigMap with specified name.
+        *
+        * @param name ConfigMap name.
+        *
+        * @return Return empty if the ConfigMap does not exist.
+        */
+       Optional<KubernetesConfigMap> getConfigMap(String name);
+
+       /**
+        * Update an existing ConfigMap with the data.
+        *
+        * @param configMapName ConfigMap to be replaced with. Benefit from <a 
href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+        *                      resource version</a> and combined with {@link 
#getConfigMap(String)}, we could perform a get-check-and-update
+        *                      transactional operation. Since concurrent 
modification could happen on a same ConfigMap,
+        *                      the update operation may fail. We need to retry 
internally. The max retry attempts could be
+        *                      configured via {@link 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+        * @param checker       Only the checker return true, the ConfigMap 
will be updated.
+        * @param function      The obtained ConfigMap will be applied to this 
function and get a new one to replace.
+        *
+        * @return Return the ConfigMap update future.
+        */

Review comment:
       minor: It might be better to explain the contract details before the 
parameters, and keep the parameter docs brief.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               return CompletableFuture.runAsync(
+                       () -> {
+                               if 
(!getConfigMap(configMap.getName()).isPresent()) {
+                                       
this.internalClient.configMaps().create(configMap.getInternalResource());
+                               }
+                       },
+                       kubeClientExecutorService);
+       }
+
+       @Override
+       public Optional<KubernetesConfigMap> getConfigMap(String name) {
+               final ConfigMap configMap = 
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+               return configMap == null ? Optional.empty() : Optional.of(new 
KubernetesConfigMap(configMap));
+       }
+
+       @Override
+       public CompletableFuture<Boolean> checkAndUpdateConfigMap(
+                       String configMapName,
+                       Predicate<KubernetesConfigMap> checker,
+                       FunctionWithException<KubernetesConfigMap, 
KubernetesConfigMap, ?> function) {
+               return FutureUtils.retry(
+                       () -> CompletableFuture.supplyAsync(
+                               () -> getConfigMap(configMapName)
+                                       
.map(FunctionUtils.uncheckedFunction(configMap -> {
+                                               final boolean shouldUpdate = 
checker.test(configMap);
+                                               if (!shouldUpdate) {
+                                                       LOG.warn("Trying to 
update ConfigMap {} to {} without checking pass, ignoring.",
+                                                               
configMap.getName(), configMap.getData());
+                                               } else {
+                                                       
this.internalClient.configMaps()
+                                                               
.inNamespace(namespace)
+                                                               
.createOrReplace(function.apply(configMap).getInternalResource());
+                                               }
+                                               return shouldUpdate;
+                                       }))
+                                       .orElseThrow(
+                                               () -> new 
FlinkRuntimeException("ConfigMap " + configMapName + " not exists.")),
+                               kubeClientExecutorService),
+                       maxRetryAttempts,
+                       kubeClientExecutorService);
+       }

Review comment:
       Combining `checker` and `function` into a function that returns 
`Optional<KubernetesConfigMap>` should help simplify this implementation.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -50,6 +52,8 @@
        public static final String LABEL_COMPONENT_KEY = "component";
        public static final String LABEL_COMPONENT_JOB_MANAGER = "jobmanager";
        public static final String LABEL_COMPONENT_TASK_MANAGER = "taskmanager";
+       public static final String LABEL_CONFIGMAP_TYPE_KEY = "configmap-type";
+       public static final String LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY = 
"high-availability";

Review comment:
       This change does not belong to this commit.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -80,4 +84,14 @@
        public static final String RESTART_POLICY_OF_NEVER = "Never";
 
        public static final String NATIVE_KUBERNETES_COMMAND = "native-k8s";
+
+       // Constants for Kubernetes high availability
+       public static final String LEADER_ADDRESS_KEY = "address";
+       public static final String LEADER_SESSION_ID_KEY = "sessionId";
+       public static final String CHECKPOINT_COUNTER_KEY = "counter";
+       public static final String RUNNING_JOBS_REGISTRY_KEY_PREFIX = 
"runningJobsRegistry";
+       public static final String JOB_GRAPH_STORE_KEY_PREFIX = "jobGraph";
+
+       public static final String LOCK_IDENTITY = UUID.randomUUID().toString();
+       public static final String LEADER_ANNOTATION_KEY = 
"control-plane.alpha.kubernetes.io/leader";

Review comment:
       These changes does not belong to this commit.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/FlinkKubeClient.java
##########
@@ -104,6 +107,67 @@ KubernetesWatch watchPodsAndDoCallback(
                Map<String, String> labels,
                WatchCallbackHandler<KubernetesPod> podCallbackHandler);
 
+       /**
+        * Create the ConfigMap with specified content. If the ConfigMap 
already exists, nothing will happen.
+        *
+        * @param configMap ConfigMap.
+        *
+        * @return Return the ConfigMap create future.
+        */
+       CompletableFuture<Void> createConfigMap(KubernetesConfigMap configMap);
+
+       /**
+        * Get the ConfigMap with specified name.
+        *
+        * @param name ConfigMap name.
+        *
+        * @return Return empty if the ConfigMap does not exist.
+        */
+       Optional<KubernetesConfigMap> getConfigMap(String name);
+
+       /**
+        * Update an existing ConfigMap with the data.
+        *
+        * @param configMapName ConfigMap to be replaced with. Benefit from <a 
href=https://kubernetes.io/docs/reference/using-api/api-concepts/#resource-versions>
+        *                      resource version</a> and combined with {@link 
#getConfigMap(String)}, we could perform a get-check-and-update
+        *                      transactional operation. Since concurrent 
modification could happen on a same ConfigMap,
+        *                      the update operation may fail. We need to retry 
internally. The max retry attempts could be
+        *                      configured via {@link 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions#KUBERNETES_MAX_RETRY_ATTEMPTS}.
+        * @param checker       Only the checker return true, the ConfigMap 
will be updated.
+        * @param function      The obtained ConfigMap will be applied to this 
function and get a new one to replace.
+        *
+        * @return Return the ConfigMap update future.
+        */
+       CompletableFuture<Boolean> checkAndUpdateConfigMap(
+               String configMapName,
+               Predicate<KubernetesConfigMap> checker,
+               FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, 
?> function);
+
+       /**
+        * Watch the ConfigMaps with specified name and do the {@link 
WatchCallbackHandler}.
+        *
+        * @param name name to filter the ConfigMaps to watch
+        * @param callbackHandler callbackHandler which reacts to ConfigMap 
events
+        * @return Return a watch for ConfigMaps. It needs to be closed after 
use.
+        */
+       KubernetesWatch watchConfigMapsAndDoCallback(

Review comment:
       ```suggestion
        KubernetesWatch watchConfigMaps(
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       protected final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       protected volatile LeaderContender leaderContender;
+
+       private volatile UUID issuedLeaderSessionID;
+
+       protected volatile UUID confirmedLeaderSessionID;
+
+       protected volatile String confirmedLeaderAddress;
+
+       protected volatile boolean running;
+
+       protected AbstractLeaderElectionService() {
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               logger.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       internalStart(contender);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+               }
+
+               logger.info("Stopping LeaderElectionService {}.", this);
+
+               internalStop();
+       }

Review comment:
       I noticed that `internalStart` is called from inside the `synchronized` 
block, while `internalStop` is called from outside the `synchronized` block. I 
think this is a bit implicit and might become hard to maintain. Implementations 
extending this abstract class can easily overlook this difference.
   
   I wonder how does it hurt if we move `internalStop` to inside the 
`synchronized` block. It might hold the lock for longer while blocking other 
threads from accessing `synchronized` codes protected by this lock, which 
should be fine given that the service is stopped anyway. At meantime, we gain 
better maintainability from it.
   
   WDYT?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -196,6 +224,16 @@ public static String getCommonStartCommand(
                ).collect(Collectors.toList());
        }
 
+       public static Predicate<KubernetesConfigMap> getLeaderChecker() {
+               return configMap -> {
+                       if (configMap.getAnnotations() != null) {
+                               final String leader = 
configMap.getAnnotations().get(LEADER_ANNOTATION_KEY);
+                               return leader != null && 
leader.contains(LOCK_IDENTITY);
+                       }
+                       return false;
+               };
+       }

Review comment:
       This change is not a common config map operations. It is closely related 
to leader election. Thus, it does not belong to this commit.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/TestingFlinkKubeClient.java
##########
@@ -103,6 +121,52 @@ public KubernetesWatch watchPodsAndDoCallback(Map<String, 
String> labels, WatchC
                return watchPodsAndDoCallbackFunction.apply(labels, 
podCallbackHandler);
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               configMapStore.putIfAbsent(configMap.getName(), configMap);
+               return CompletableFuture.completedFuture(null);
+       }
+
+       @Override
+       public Optional<KubernetesConfigMap> getConfigMap(String name) {
+               final KubernetesConfigMap configMap = configMapStore.get(name);
+               if (configMap == null) {
+                       return Optional.empty();
+               }
+               return Optional.of(new 
MockKubernetesConfigMap(configMap.getName(), new 
HashMap<>(configMap.getData())));
+       }
+
+       @Override
+       public CompletableFuture<Boolean> checkAndUpdateConfigMap(
+                       String configMapName,
+                       Predicate<KubernetesConfigMap> checker,
+                       FunctionWithException<KubernetesConfigMap, 
KubernetesConfigMap, ?> function) {
+               return 
getConfigMap(configMapName).map(FunctionUtils.uncheckedFunction(
+                       configMap -> {
+                               final boolean shouldUpdate = 
checker.test(configMap);
+                               if (shouldUpdate) {
+                                       configMapStore.put(configMap.getName(), 
function.apply(configMap));
+                               }
+                               return 
CompletableFuture.completedFuture(shouldUpdate);
+                       }))
+                       .orElseThrow(() -> new FlinkRuntimeException("ConfigMap 
" + configMapName + " not exists."));
+       }

Review comment:
       Not sure about having a `configMapStore` and these implementations in 
the testing class. I would suggest to have `createConfigMapFunction`, 
`getConfigMapFunction`, `checkAndUpdateConfigMapFunction` instead.
   
   The current limitation is less flexible. E.g., we cannot simulate situations 
where config maps are deleted externally.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       protected final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       protected volatile LeaderContender leaderContender;
+
+       private volatile UUID issuedLeaderSessionID;
+
+       protected volatile UUID confirmedLeaderSessionID;
+
+       protected volatile String confirmedLeaderAddress;
+
+       protected volatile boolean running;
+
+       protected AbstractLeaderElectionService() {
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               logger.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       internalStart(contender);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+               }
+
+               logger.info("Stopping LeaderElectionService {}.", this);
+
+               internalStop();
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (logger.isDebugEnabled()) {
+                       logger.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               if (checkLeaderLatch()) {

Review comment:
       I think the concept 'leader latch' is from ZooKeeper? Maybe we should 
abstract this as a common meaningful interface?
   I noticed the only difference between `checkLeadeerLatch` and 
`hasLeadership` is `leaderSessionId.equals(issuedLeaderSessionID)`. If we 
adjust the order of the `if`s, would it be possible to get rid of 
`checkLeaderLatch` and replace it with an abstract `hasLeaderShip`?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")

Review comment:
       ```suggestion
                        
key("high-availability.kubernetes.leader-election.lease-duration")
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesLeaderElector.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+
+import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfigBuilder;
+import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
+import 
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.ConfigMapLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.kubernetes.utils.Constants.LOCK_IDENTITY;
+
+/**
+ * Represent Leader Elector in kubernetes.
+ */
+public class KubernetesLeaderElector extends 
LeaderElector<NamespacedKubernetesClient> {

Review comment:
       IIUC, the lifecycle for each `run` for the elector ends when the 
leadership is revoked. To join another round of election, we need to trigger 
`run` again. It would be better to explain these in the JavaDocs.  

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+       private final ExecutorService executorService =
+               Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("IO-Executor"));
+       private final Configuration configuration = new Configuration();
+
+       protected static final String CLUSTER_ID = "leader-test-cluster";
+       protected static final String LEADER_URL = 
"akka.tcp://[email protected]:6123/user/rpc/resourcemanager";
+       protected static final long TIMEOUT = 30L * 1000L;
+       protected static final String LEADER_CONFIGMAP_NAME = 
"k8s-ha-app1-resourcemanager";
+       protected final Map<String, KubernetesConfigMap> configMapStore = new 
HashMap<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
configMapsAndDoCallbackFuture =
+               new CompletableFuture<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
leaderRetrievalConfigMapCallback =
+               new CompletableFuture<>();

Review comment:
       It seems `configMapStore` and the 2 futures are reused across test 
cases. There could be stability issues.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. If it could not " +
+                               "succeed in the given time, the renew operation 
will be aborted.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+                       key("high-availability.kubernetes.client.retry-period")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(3))

Review comment:
       Would it be too frequent to check the leadership every 3s? Given that 
the default lease duration is 30s.
   
   Ideally, if a contender checks the leadership and learns the remaining lease 
during, it does not make sense to check again before the lease can be expired. 
Maybe it makes sense to decide when to perform the next checking dynamically 
based on the remaining lease duration.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       protected final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       protected volatile LeaderContender leaderContender;
+
+       private volatile UUID issuedLeaderSessionID;
+
+       protected volatile UUID confirmedLeaderSessionID;
+
+       protected volatile String confirmedLeaderAddress;
+
+       protected volatile boolean running;
+
+       protected AbstractLeaderElectionService() {
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               logger.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       internalStart(contender);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+               }
+
+               logger.info("Stopping LeaderElectionService {}.", this);
+
+               internalStop();
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (logger.isDebugEnabled()) {
+                       logger.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               if (checkLeaderLatch()) {
+                       // check if this is an old confirmation call
+                       synchronized (lock) {
+                               if (running) {
+                                       if 
(leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                               
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                               writeLeaderInformation();
+                                       }
+                               } else {
+                                       logger.debug("Ignoring the leader 
session Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       }
+               } else {
+                       logger.warn("The leader session ID {} was confirmed 
even though the " +
+                               "corresponding JobManager was not elected as 
the leader.", leaderSessionID);
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               return checkLeaderLatch() && 
leaderSessionId.equals(issuedLeaderSessionID);
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       protected abstract void internalStart(LeaderContender contender) throws 
Exception;
+
+       protected abstract void internalStop() throws Exception;
+
+       protected abstract void writeLeaderInformation();
+
+       protected abstract boolean checkLeaderLatch();

Review comment:
       Would be nice to add JavaDocs for these interfaces. They are meant to be 
implemented by various leader election services.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * High availability service for Kubernetes.
+ */
+public class KubernetesHaServices implements HighAvailabilityServices {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesHaServices.class);
+
+       private static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+       private static final String DISPATCHER_NAME = "dispatcher";
+
+       private static final String JOB_MANAGER_NAME = "jobmanager";
+
+       private static final String REST_SERVER_NAME = "restserver";
+
+       private final String leaderSuffix;
+
+       private final String clusterId;
+
+       /** Kubernetes client. */
+       private final FlinkKubeClient kubeClient;
+
+       /** The executor to run Kubernetes operations on. */
+       private final Executor executor;
+
+       /** The runtime configuration. */
+       private final Configuration configuration;
+
+       /** Store for arbitrary blobs. */
+       private final BlobStoreService blobStoreService;
+
+       /** The Kubernetes based running jobs registry. */
+       private final RunningJobsRegistry runningJobsRegistry;

Review comment:
       These are in-common with `ZooKeeperHaService`. Would it make sense to 
abstract them to an `AbstractHaService`?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +230,68 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               return CompletableFuture.runAsync(
+                       () -> {
+                               if 
(!getConfigMap(configMap.getName()).isPresent()) {
+                                       
this.internalClient.configMaps().create(configMap.getInternalResource());
+                               }

Review comment:
       The existence check and creation are not guaranteed atomic. What happens 
if another client creates the config map in between? Does the creation 
operation fail or overwrites the existing one? If it fails, where is the 
exception handled?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
                this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
                
assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
        }
+
+       @Test
+       public void testCreateAndDeleteConfigMap() {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(true));
+               this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(false));
+       }
+
+       @Test
+       public void testCheckAndUpdateConfigMap() throws Exception {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+
+               final Supplier<Exception> configMapNotExistException = () -> 
new Exception("ConfigMap not exist");
+               FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, 
?> function = c -> {
+                       c.getData().put(LEADER_ADDRESS_KEY, LEADER_ADDRESS_NEW);
+                       return c;
+               };
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+
+               // Checker not pass
+               
this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> 
false, function).get();
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+
+               // Checker pass
+               
this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> true, 
function).get();
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS_NEW));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+       }
+
+       private KubernetesConfigMap buildHAConfigMap() {
+               final Map<String, String> data = new HashMap<>();
+               data.put(LEADER_ADDRESS_KEY, LEADER_ADDRESS);
+               return new KubernetesConfigMap(new ConfigMapBuilder()
+                       .withNewMetadata()
+                       .withName(LEADER_CONFIG_MAP_NAME)
+                       .withLabels(haLabels)
+                       .endMetadata()
+                       .withData(data).build());
+       }

Review comment:
       I would suggest to use arbitrary testing config map for testing the 
client. The config map interfaces should work correctly regardless of whether 
the config map is for HA or not.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))

Review comment:
       I think the default 15s timeout does not match the default 30s lease 
duration. That means after the leader gives up the leadership, there are 
another 15s before another contender can become leader. During this 15s, 
there's practically no leader.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServicesFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.kubeclient.KubeClientFactory;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Factory for creating Kubernetes high availability services.
+ */
+public class KubernetesHaServicesFactory implements 
HighAvailabilityServicesFactory {
+
+       @Override
+       public HighAvailabilityServices createHAServices(Configuration 
configuration, Executor executor) throws Exception {
+               return new KubernetesHaServices(
+                       KubeClientFactory.fromConfiguration(configuration),

Review comment:
       IIUC, this means we create 2 `KubeClient`s in the JobManager process? 
Would it be a problem?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");

Review comment:
       Any reason to make this suffix configurable?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -106,13 +110,37 @@ public static String getDeploymentName(String clusterId) {
         * @return Task manager labels.
         */
        public static Map<String, String> getTaskManagerLabels(String 
clusterId) {
-               final Map<String, String> labels = new HashMap<>();
-               labels.put(Constants.LABEL_TYPE_KEY, 
Constants.LABEL_TYPE_NATIVE_TYPE);
-               labels.put(Constants.LABEL_APP_KEY, clusterId);
+               final Map<String, String> labels = new 
HashMap<>(getCommonLabels(clusterId));
                labels.put(Constants.LABEL_COMPONENT_KEY, 
Constants.LABEL_COMPONENT_TASK_MANAGER);
                return Collections.unmodifiableMap(labels);
        }
 
+       /**
+        * Get the common labels for Flink native clusters. All the Kubernetes 
resources will be set with these labels.
+        *
+        * @param clusterId cluster id
+        * @return Return common labels map
+        */
+       public static Map<String, String> getCommonLabels(String clusterId) {
+               Map<String, String> commonLabels = new HashMap<>();
+               commonLabels.put(Constants.LABEL_TYPE_KEY, 
Constants.LABEL_TYPE_NATIVE_TYPE);
+               commonLabels.put(Constants.LABEL_APP_KEY, clusterId);
+
+               return Collections.unmodifiableMap(commonLabels);

Review comment:
       Why returning an unmodifiable map? The returned map is a new instance 
generated in this method. There should be no other reference to it except for 
what returned from this method.
   
   Returning a modifiable map would also save us from converting back to a 
modifiable map in `getTaskManagerLabels` and `getConfigMapLabels`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/Constants.java
##########
@@ -80,4 +84,14 @@
        public static final String RESTART_POLICY_OF_NEVER = "Never";
 
        public static final String NATIVE_KUBERNETES_COMMAND = "native-k8s";
+
+       // Constants for Kubernetes high availability
+       public static final String LEADER_ADDRESS_KEY = "address";
+       public static final String LEADER_SESSION_ID_KEY = "sessionId";
+       public static final String CHECKPOINT_COUNTER_KEY = "counter";
+       public static final String RUNNING_JOBS_REGISTRY_KEY_PREFIX = 
"runningJobsRegistry";
+       public static final String JOB_GRAPH_STORE_KEY_PREFIX = "jobGraph";
+
+       public static final String LOCK_IDENTITY = UUID.randomUUID().toString();

Review comment:
       This is not a typical "constant". It is expected to be different for 
each process. We'd better move it to somewhere else (e.g., 
`KubernetesLeaderElectionService`) and init it as a non static field.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java
##########
@@ -115,11 +115,7 @@ public String getImage() {
 
        @Override
        public Map<String, String> getCommonLabels() {
-               Map<String, String> commonLabels = new HashMap<>();
-               commonLabels.put(Constants.LABEL_TYPE_KEY, 
Constants.LABEL_TYPE_NATIVE_TYPE);
-               commonLabels.put(Constants.LABEL_APP_KEY, getClusterId());
-
-               return Collections.unmodifiableMap(commonLabels);
+               return KubernetesUtils.getCommonLabels(getClusterId());

Review comment:
       Seems this change does not belong to this commit?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       protected final Object lock = new Object();

Review comment:
       It is not introduced by this PR, but it might be better to comment which 
internal states are protected by this lock.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
                this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
                
assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
        }
+
+       @Test
+       public void testCreateAndDeleteConfigMap() {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(true));
+               this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(false));
+       }

Review comment:
       Would be better to split this into 2 cases, keeping one purpose for each 
case.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
                this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
                
assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
        }
+
+       @Test
+       public void testCreateAndDeleteConfigMap() {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(true));
+               this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(false));
+       }
+
+       @Test
+       public void testCheckAndUpdateConfigMap() throws Exception {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+
+               final Supplier<Exception> configMapNotExistException = () -> 
new Exception("ConfigMap not exist");
+               FunctionWithException<KubernetesConfigMap, KubernetesConfigMap, 
?> function = c -> {
+                       c.getData().put(LEADER_ADDRESS_KEY, LEADER_ADDRESS_NEW);
+                       return c;
+               };
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+
+               // Checker not pass
+               
this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> 
false, function).get();
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+
+               // Checker pass
+               
this.flinkKubeClient.checkAndUpdateConfigMap(LEADER_CONFIG_MAP_NAME, c -> true, 
function).get();
+               this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).map(
+                       configMap -> {
+                               
assertThat(configMap.getData().get(LEADER_ADDRESS_KEY), is(LEADER_ADDRESS_NEW));
+                               return configMap;
+                       }
+               ).orElseThrow(configMapNotExistException);
+       }

Review comment:
       1. Let's separate this to 2 test cases.
   2. Instead of checking the existence with `orElseThrow`, I think asserting 
`Optional#isPresent` should provide better readability, for all the 3 
occurences.
   ```
   final Optional<ConfigMap> configMapOpt = 
flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME);
   assertThat(configMapOpt.isPresent(), true);
   assertThat(configMapOpt.get().getData().get(LEADER_ADDRESS_KEY), is(xxx));
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")

Review comment:
       ```suggestion
                        
key("high-availability.kubernetes.leader-election.renew-deadline")
   ```

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. If it could not " +
+                               "succeed in the given time, the renew operation 
will be aborted.");

Review comment:
       It's a bit confusing what does "renew operation will be aborted" mean. I 
think we should explain that a leader will give up its leadership if it cannot 
successfully renew the lease within this time.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;
+
+       KubernetesLeaderElectionService(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       KubernetesLeaderElectionConfiguration leaderConfig) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.executor = checkNotNull(executor, "Executor should not be 
null.");
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.leaderContender = null;
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+       }
+
+       @Override
+       public void internalStart(LeaderContender contender) {
+               CompletableFuture.runAsync(leaderElector::run, executor);
+               kubernetesWatch = 
kubeClient.watchConfigMapsAndDoCallback(configMapName, new 
ConfigMapCallbackHandlerImpl());
+       }
+
+       @Override
+       public void internalStop() {
+               if (kubernetesWatch != null) {
+                       kubernetesWatch.close();
+               }
+       }
+
+       @Override
+       protected void writeLeaderInformation() {
+               updateConfigMap(configMapName);
+       }
+
+       @Override
+       protected boolean checkLeaderLatch() {
+               return kubeClient.getConfigMap(configMapName)
+                       .map(configMap -> 
KubernetesUtils.getLeaderChecker().test(configMap))
+                       .orElse(false);
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderElectionService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private void updateConfigMap(String configMapName) {

Review comment:
       I think the name `updateConfigMap` is not very descriptive. What this 
method really does is to write its own information to the leader information 
config map.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderElectionService.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.leaderelection.AbstractLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Leader election service for multiple JobManagers. The active JobManager is 
elected using Kubernetes.
+ * The current leader's address as well as its leader session ID is published 
via Kubernetes ConfigMap.
+ * Note that the contending lock and leader storage are using the same 
ConfigMap. And every component(e.g.
+ * ResourceManager, Dispatcher, RestEndpoint, JobManager for each job) will 
have a separate ConfigMap.
+ */
+public class KubernetesLeaderElectionService extends 
AbstractLeaderElectionService {
+
+       private final FlinkKubeClient kubeClient;
+
+       private final Executor executor;
+
+       private final String configMapName;
+
+       private final KubernetesLeaderElector leaderElector;
+
+       private KubernetesWatch kubernetesWatch;
+
+       // Labels will be used to clean up the ha related ConfigMaps.
+       private Map<String, String> configMapLabels;
+
+       KubernetesLeaderElectionService(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       KubernetesLeaderElectionConfiguration leaderConfig) {
+
+               this.kubeClient = checkNotNull(kubeClient, "Kubernetes client 
should not be null.");
+               this.executor = checkNotNull(executor, "Executor should not be 
null.");
+               this.configMapName = leaderConfig.getConfigMapName();
+               this.leaderElector = 
kubeClient.createLeaderElector(leaderConfig, new LeaderCallbackHandlerImpl());
+               this.leaderContender = null;
+               this.configMapLabels = KubernetesUtils.getConfigMapLabels(
+                       leaderConfig.getClusterId(), 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY);
+       }
+
+       @Override
+       public void internalStart(LeaderContender contender) {
+               CompletableFuture.runAsync(leaderElector::run, executor);
+               kubernetesWatch = 
kubeClient.watchConfigMapsAndDoCallback(configMapName, new 
ConfigMapCallbackHandlerImpl());
+       }
+
+       @Override
+       public void internalStop() {
+               if (kubernetesWatch != null) {
+                       kubernetesWatch.close();
+               }
+       }
+
+       @Override
+       protected void writeLeaderInformation() {
+               updateConfigMap(configMapName);
+       }
+
+       @Override
+       protected boolean checkLeaderLatch() {
+               return kubeClient.getConfigMap(configMapName)
+                       .map(configMap -> 
KubernetesUtils.getLeaderChecker().test(configMap))
+                       .orElse(false);
+       }
+
+       @Override
+       public String toString() {
+               return "KubernetesLeaderElectionService{configMapName='" + 
configMapName + "'}";
+       }
+
+       private void updateConfigMap(String configMapName) {
+               try {
+                       kubeClient.checkAndUpdateConfigMap(
+                               configMapName,
+                               KubernetesUtils.getLeaderChecker(),
+                               configMap -> {
+                                       // Get the updated ConfigMap with new 
leader information
+                                       if (confirmedLeaderAddress != null && 
confirmedLeaderSessionID != null) {
+                                               
configMap.getData().put(LEADER_ADDRESS_KEY, confirmedLeaderAddress);
+                                               
configMap.getData().put(LEADER_SESSION_ID_KEY, 
confirmedLeaderSessionID.toString());
+                                       }
+                                       
configMap.getLabels().putAll(configMapLabels);
+                                       return configMap;
+                               }).get();
+               } catch (Exception e) {
+                       leaderContender.handleError(new Exception("Could not 
update ConfigMap " + configMapName, e));
+               }
+       }
+
+       private class LeaderCallbackHandlerImpl extends 
KubernetesLeaderElector.LeaderCallbackHandler {
+
+               @Override
+               public void isLeader() {
+                       onGrantLeadership();
+               }
+
+               @Override
+               public void notLeader() {
+                       // Clear the leader information in ConfigMap
+                       try {
+                               kubeClient.checkAndUpdateConfigMap(
+                                       configMapName,
+                                       KubernetesUtils.getLeaderChecker(),
+                                       configMap -> {
+                                               
configMap.getData().remove(LEADER_ADDRESS_KEY);
+                                               
configMap.getData().remove(LEADER_SESSION_ID_KEY);
+                                               return configMap;
+                                       }
+                               ).get();
+                       } catch (Exception e) {
+                               leaderContender.handleError(
+                                       new Exception("Could not remove leader 
information from ConfigMap " + configMapName, e));
+                       }
+                       onRevokeLeadership();
+                       // Continue to contend the leader
+                       CompletableFuture.runAsync(leaderElector::run, 
executor);
+               }
+       }
+
+       private class ConfigMapCallbackHandlerImpl implements 
FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
+
+               @Override
+               public void onAdded(List<KubernetesConfigMap> configMaps) {
+                       // noop
+               }
+
+               @Override
+               public void onModified(List<KubernetesConfigMap> configMaps) {
+                       if (checkLeaderLatch()) {
+                               configMaps.forEach(configMap -> {
+                                       if (isLeaderChanged(configMap)) {
+                                               // the data field does not 
correspond to the expected leader information
+                                               if (logger.isDebugEnabled()) {
+                                                       
logger.debug("Correcting leader information in {} by {}.",
+                                                               configMapName, 
leaderContender.getDescription());
+                                               }
+                                               
updateConfigMap(configMap.getName());
+                                       }
+                               });
+                       }
+               }

Review comment:
       Would it be possible that this method is called before 
`confirmLeadership`, so that the leader election service writes the leader 
information before the contender confirms the leadership?

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesLeaderRetrievalService.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.flink.kubernetes.utils.Constants.LEADER_ADDRESS_KEY;
+import static 
org.apache.flink.kubernetes.utils.Constants.LEADER_SESSION_ID_KEY;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * This implementation of the {@link LeaderRetrievalService} retrieves the 
current leader which has
+ * been elected by the {@link 
org.apache.flink.kubernetes.highavailability.KubernetesLeaderElectionService}.
+ * The leader address as well as the current leader session ID is retrieved 
from Kubernetes ConfigMap.
+ */
+class KubernetesLeaderRetrievalService implements LeaderRetrievalService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesLeaderRetrievalService.class);
+
+       private final Object lock = new Object();

Review comment:
       Better to explain which states/variables should be guarded by this lock.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesHaServices.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import 
org.apache.flink.kubernetes.configuration.KubernetesHighAvailabilityOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.utils.KubernetesUtils;
+import org.apache.flink.runtime.blob.BlobStore;
+import org.apache.flink.runtime.blob.BlobStoreService;
+import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
+import org.apache.flink.runtime.jobmanager.JobGraphStore;
+import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.kubernetes.utils.Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY;
+import static org.apache.flink.kubernetes.utils.Constants.NAME_SEPARATOR;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * High availability service for Kubernetes.
+ */
+public class KubernetesHaServices implements HighAvailabilityServices {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(KubernetesHaServices.class);
+
+       private static final String RESOURCE_MANAGER_NAME = "resourcemanager";
+
+       private static final String DISPATCHER_NAME = "dispatcher";
+
+       private static final String JOB_MANAGER_NAME = "jobmanager";
+
+       private static final String REST_SERVER_NAME = "restserver";
+
+       private final String leaderSuffix;
+
+       private final String clusterId;
+
+       /** Kubernetes client. */
+       private final FlinkKubeClient kubeClient;
+
+       /** The executor to run Kubernetes operations on. */
+       private final Executor executor;
+
+       /** The runtime configuration. */
+       private final Configuration configuration;
+
+       /** Store for arbitrary blobs. */
+       private final BlobStoreService blobStoreService;
+
+       /** The Kubernetes based running jobs registry. */
+       private final RunningJobsRegistry runningJobsRegistry;
+
+       KubernetesHaServices(
+                       FlinkKubeClient kubeClient,
+                       Executor executor,
+                       Configuration config,
+                       BlobStoreService blobStoreService) {
+
+               this.kubeClient = checkNotNull(kubeClient);
+               this.executor = checkNotNull(executor);
+               this.configuration = checkNotNull(config);
+               this.clusterId = 
checkNotNull(config.get(KubernetesConfigOptions.CLUSTER_ID));
+               this.blobStoreService = blobStoreService;
+
+               this.leaderSuffix = 
config.getString(KubernetesHighAvailabilityOptions.HA_KUBERNETES_LEADER_SUFFIX);
+
+               this.runningJobsRegistry = new StandaloneRunningJobsRegistry();
+       }
+
+       @Override
+       public LeaderRetrievalService getResourceManagerLeaderRetriever() {
+               return createLeaderRetrievalService(RESOURCE_MANAGER_NAME);
+       }
+
+       @Override
+       public LeaderRetrievalService getDispatcherLeaderRetriever() {
+               return createLeaderRetrievalService(DISPATCHER_NAME);
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
{
+               return 
createLeaderRetrievalService(getLeaderNameForJobManager(jobID));
+       }
+
+       @Override
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultJobManagerAddress) {
+               return getJobManagerLeaderRetriever(jobID);
+       }
+
+       @Override
+       public LeaderRetrievalService getClusterRestEndpointLeaderRetriever() {
+               return createLeaderRetrievalService(REST_SERVER_NAME);
+       }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() {
+               return createLeaderElectionService(RESOURCE_MANAGER_NAME);
+       }
+
+       @Override
+       public LeaderElectionService getDispatcherLeaderElectionService() {
+               return createLeaderElectionService(DISPATCHER_NAME);
+       }
+
+       @Override
+       public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+               return 
createLeaderElectionService(getLeaderNameForJobManager(jobID));
+       }
+
+       @Override
+       public LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+               return createLeaderElectionService(REST_SERVER_NAME);
+       }
+
+       @Override
+       public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
+               return new StandaloneCheckpointRecoveryFactory();
+       }
+
+       @Override
+       public JobGraphStore getJobGraphStore() {
+               return new StandaloneJobGraphStore();
+       }
+
+       @Override
+       public RunningJobsRegistry getRunningJobsRegistry() {
+               return runningJobsRegistry;
+       }
+
+       @Override
+       public BlobStore createBlobStore() {
+               return blobStoreService;
+       }
+
+       @Override
+       public void close() throws Exception {
+               Throwable exception = null;
+
+               try {
+                       blobStoreService.close();
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               kubeClient.close();
+
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(exception, "Could not 
properly close the KubernetesHaServices.");
+               }
+       }
+
+       @Override
+       public void closeAndCleanupAllData() throws Exception {
+               LOG.info("Close and clean up all data for 
KubernetesHaServices.");
+
+               Throwable exception = null;
+
+               try {
+                       blobStoreService.closeAndCleanupAllData();
+               } catch (Throwable t) {
+                       exception = t;
+               }
+
+               try {
+                       kubeClient.deleteConfigMapsByLabels(
+                               KubernetesUtils.getConfigMapLabels(clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY));
+               } catch (Throwable t) {
+                       exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+               }
+
+               kubeClient.close();
+
+               if (exception != null) {
+                       ExceptionUtils.rethrowException(
+                               exception, "Could not properly close and clean 
up all data of KubernetesHaServices.");
+               }
+               LOG.info("Finished cleaning up the high availability data.");
+       }
+
+       private KubernetesLeaderElectionService 
createLeaderElectionService(String leaderName) {
+               return new KubernetesLeaderElectionService(
+                       kubeClient,
+                       executor,
+                       
KubernetesLeaderElectionConfiguration.fromConfiguration(getLeaderConfigMapName(leaderName),
 configuration));
+       }
+
+       private KubernetesLeaderRetrievalService 
createLeaderRetrievalService(String leaderName) {
+               return new KubernetesLeaderRetrievalService(kubeClient, 
getLeaderConfigMapName(leaderName));
+       }

Review comment:
       I think we can make these two methods common interfaces in 
`AbstractHaService`, and provide different implementations for 
`Kubernetes/ZooKeeperHaService`. We can also move the component names to the 
common base class, and converting them to lock path in 
`ZooKeeperHaService#createLeaderElection/RetrievalService`. In this way, all 
the `getXXXLeaderElectionService/Retriever` methods can be moved to the base 
class and reused.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/resources/KubernetesWatcher.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.kubeclient.resources;
+
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.KubernetesClientException;
+import io.fabric8.kubernetes.client.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Watcher for resources in Kubernetes.
+ */
+public abstract class KubernetesWatcher<T extends HasMetadata, K extends 
KubernetesResource<T>> implements Watcher<T> {

Review comment:
       It would be better to place changes in this class and 
`KubernetesPodsWatcher` in a separate commit. It seems to me these changes are 
re-abstraction of the watchers, not watcher callbacks as described in the 
commit message.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. If it could not " +
+                               "succeed in the given time, the renew operation 
will be aborted.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+                       key("high-availability.kubernetes.client.retry-period")

Review comment:
       ```suggestion
                        
key("high-availability.kubernetes.leader-election.retry-period")
   ```

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+       private final ExecutorService executorService =
+               Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("IO-Executor"));
+       private final Configuration configuration = new Configuration();
+
+       protected static final String CLUSTER_ID = "leader-test-cluster";
+       protected static final String LEADER_URL = 
"akka.tcp://[email protected]:6123/user/rpc/resourcemanager";
+       protected static final long TIMEOUT = 30L * 1000L;
+       protected static final String LEADER_CONFIGMAP_NAME = 
"k8s-ha-app1-resourcemanager";
+       protected final Map<String, KubernetesConfigMap> configMapStore = new 
HashMap<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
configMapsAndDoCallbackFuture =
+               new CompletableFuture<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
leaderRetrievalConfigMapCallback =
+               new CompletableFuture<>();
+
+       @Before
+       public void setup() {
+               configuration.setString(KubernetesConfigOptions.CLUSTER_ID, 
CLUSTER_ID);
+       }
+
+       @After
+       public void teardown() {
+               executorService.shutdownNow();
+       }
+
+       protected KubernetesLeaderElectionService 
createLeaderElectionService(AtomicBoolean leaderController) {
+               final TestingFlinkKubeClient flinkKubeClient = 
TestingFlinkKubeClient.builder()
+                       .setConfigMapStore(configMapStore)
+                       .setWatchConfigMapsAndDoCallbackFunction((ignore, 
handler) -> {
+                               configMapsAndDoCallbackFuture.complete(handler);
+                               return new 
TestingFlinkKubeClient.MockKubernetesWatch();
+                       })
+                       .setLeaderController(leaderController).build();
+               return new KubernetesLeaderElectionService(
+                       flinkKubeClient,
+                       executorService,
+                       
KubernetesLeaderElectionConfiguration.fromConfiguration(LEADER_CONFIGMAP_NAME, 
configuration));
+       }
+
+       protected KubernetesLeaderRetrievalService 
createLeaderRetrievalService() {
+               final TestingFlinkKubeClient flinkKubeClient = 
TestingFlinkKubeClient.builder()
+                       .setConfigMapStore(configMapStore)
+                       .setWatchConfigMapsAndDoCallbackFunction((ignore, 
handler) -> {
+                               
leaderRetrievalConfigMapCallback.complete(handler);
+                               return new 
TestingFlinkKubeClient.MockKubernetesWatch();
+                       }).build();
+               return new KubernetesLeaderRetrievalService(flinkKubeClient, 
LEADER_CONFIGMAP_NAME);
+       }
+
+       /**
+        * Context to leader election and retrieval tests.
+        */
+       protected class Context {
+               final AtomicBoolean leaderController = new AtomicBoolean(false);
+               final KubernetesLeaderElectionService leaderElectionService = 
createLeaderElectionService(leaderController);
+               final TestingContender contender = new 
TestingContender(LEADER_URL, leaderElectionService);
+
+               final KubernetesLeaderRetrievalService leaderRetrievalService = 
createLeaderRetrievalService();
+               final TestingListener listener = new TestingListener();
+
+               protected final void runTest(RunnableWithException testMethod) 
throws Exception {
+                       leaderElectionService.start(contender);
+                       leaderController.set(true);
+                       contender.waitForLeader(TIMEOUT);
+                       assertThat(contender.isLeader(), is(true));
+                       leaderRetrievalService.start(listener);
+                       testMethod.run();
+                       leaderElectionService.stop();
+                       leaderRetrievalService.stop();
+               }

Review comment:
       I would suggest to separate the action granting leadership to the 
contender apart from this method, for better readability.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader

Review comment:
       I think regex before the examples would help understand the pattern.
   IIUC, `k8s-ha-<cluster-id>-<component-name>-leader`?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+       private final ExecutorService executorService =
+               Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("IO-Executor"));
+       private final Configuration configuration = new Configuration();

Review comment:
       It seems `executorService ` and `configuration` are reused across test 
cases. There could be stability issues.

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/highavailability/KubernetesHighAvailabilityTestBase.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.highavailability;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
+import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
+import 
org.apache.flink.kubernetes.kubeclient.KubernetesLeaderElectionConfiguration;
+import org.apache.flink.kubernetes.kubeclient.TestingFlinkKubeClient;
+import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
+import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingListener;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link KubernetesLeaderElectionService}.
+ */
+public class KubernetesHighAvailabilityTestBase extends TestLogger {
+
+       private final ExecutorService executorService =
+               Executors.newFixedThreadPool(4, new 
ExecutorThreadFactory("IO-Executor"));
+       private final Configuration configuration = new Configuration();
+
+       protected static final String CLUSTER_ID = "leader-test-cluster";
+       protected static final String LEADER_URL = 
"akka.tcp://[email protected]:6123/user/rpc/resourcemanager";
+       protected static final long TIMEOUT = 30L * 1000L;
+       protected static final String LEADER_CONFIGMAP_NAME = 
"k8s-ha-app1-resourcemanager";
+       protected final Map<String, KubernetesConfigMap> configMapStore = new 
HashMap<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
configMapsAndDoCallbackFuture =
+               new CompletableFuture<>();
+       protected final 
CompletableFuture<FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap>> 
leaderRetrievalConfigMapCallback =
+               new CompletableFuture<>();
+
+       @Before
+       public void setup() {
+               configuration.setString(KubernetesConfigOptions.CLUSTER_ID, 
CLUSTER_ID);
+       }
+
+       @After
+       public void teardown() {
+               executorService.shutdownNow();
+       }
+
+       protected KubernetesLeaderElectionService 
createLeaderElectionService(AtomicBoolean leaderController) {
+               final TestingFlinkKubeClient flinkKubeClient = 
TestingFlinkKubeClient.builder()
+                       .setConfigMapStore(configMapStore)
+                       .setWatchConfigMapsAndDoCallbackFunction((ignore, 
handler) -> {
+                               configMapsAndDoCallbackFuture.complete(handler);
+                               return new 
TestingFlinkKubeClient.MockKubernetesWatch();
+                       })
+                       .setLeaderController(leaderController).build();
+               return new KubernetesLeaderElectionService(
+                       flinkKubeClient,
+                       executorService,
+                       
KubernetesLeaderElectionConfiguration.fromConfiguration(LEADER_CONFIGMAP_NAME, 
configuration));
+       }
+
+       protected KubernetesLeaderRetrievalService 
createLeaderRetrievalService() {
+               final TestingFlinkKubeClient flinkKubeClient = 
TestingFlinkKubeClient.builder()
+                       .setConfigMapStore(configMapStore)
+                       .setWatchConfigMapsAndDoCallbackFunction((ignore, 
handler) -> {
+                               
leaderRetrievalConfigMapCallback.complete(handler);
+                               return new 
TestingFlinkKubeClient.MockKubernetesWatch();
+                       }).build();
+               return new KubernetesLeaderRetrievalService(flinkKubeClient, 
LEADER_CONFIGMAP_NAME);
+       }
+
+       /**
+        * Context to leader election and retrieval tests.
+        */
+       protected class Context {
+               final AtomicBoolean leaderController = new AtomicBoolean(false);
+               final KubernetesLeaderElectionService leaderElectionService = 
createLeaderElectionService(leaderController);
+               final TestingContender contender = new 
TestingContender(LEADER_URL, leaderElectionService);
+
+               final KubernetesLeaderRetrievalService leaderRetrievalService = 
createLeaderRetrievalService();
+               final TestingListener listener = new TestingListener();
+
+               protected final void runTest(RunnableWithException testMethod) 
throws Exception {
+                       leaderElectionService.start(contender);
+                       leaderController.set(true);
+                       contender.waitForLeader(TIMEOUT);
+                       assertThat(contender.isLeader(), is(true));
+                       leaderRetrievalService.start(listener);
+                       testMethod.run();
+                       leaderElectionService.stop();
+                       leaderRetrievalService.stop();
+               }

Review comment:
       Or maybe renaming the method to `runTestAndGrantLeadershipToContender`, 
if this is needed for almost every test case.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesHighAvailabilityOptions.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.configuration;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+
+import java.time.Duration;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * The set of configuration options relating to Kubernetes high-availability 
settings.
+ * All the HA information relevant for a specific component will be stored in 
a single ConfigMap.
+ * For example, the Dispatcher's ConfigMap would then contain the current 
leader, the running jobs
+ * and the pointers to the persisted JobGraphs.
+ * The JobManager's ConfigMap would then contain the current leader, the 
pointers to the checkpoints
+ * and the checkpoint ID counter.
+ *
+ * <p>The ConfigMap name will be created with the following pattern.
+ * e.g. k8s-ha-app1-restserver-leader, 
k8s-ha-app1-00000000000000000000000000000000-jobmanager-leader
+ */
+@PublicEvolving
+public class KubernetesHighAvailabilityOptions {
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<String> HA_KUBERNETES_LEADER_SUFFIX =
+                       key("high-availability.kubernetes.leader.suffix")
+                       .stringType()
+                       .defaultValue("leader")
+                       .withDescription("The ConfigMap suffix of the leader 
which contains the URL to the leader and the " +
+                               "current leader session ID. Leader elector will 
use the same ConfigMap for contending the lock.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_LEASE_DURATION =
+                       
key("high-availability.kubernetes.client.lease-duration")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(30))
+                       .withDescription("Define the lease duration for the 
Kubernetes leader election in ms. The leader will " +
+                               "continuously renew its lease time to indicate 
its existence. And the followers will do a lease " +
+                               "checking against the current time. \"renewTime 
+ leaseDuration > now\" means the leader is alive.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RENEW_DEADLINE =
+                       
key("high-availability.kubernetes.client.renew-deadline")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(15))
+                       .withDescription("Defines the deadline when the leader 
tries to renew the lease in ms. If it could not " +
+                               "succeed in the given time, the renew operation 
will be aborted.");
+
+       
@Documentation.Section(Documentation.Sections.EXPERT_KUBERNETES_HIGH_AVAILABILITY)
+       public static final ConfigOption<Duration> KUBERNETES_RETRY_PERIOD =
+                       key("high-availability.kubernetes.client.retry-period")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(3))
+                       .withDescription("Defines the pause between consecutive 
retries in ms. Both the leader and followers use " +
+                               "this value for the retry.");

Review comment:
       It's not clear what a retry means. I think we should explain that all 
contenders periodically try to acquire/renew the leadership if possible, at 
this interval.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.UUID;
+
+/**
+ * Abstract class for leader election service based on distributed 
coordination system(e.g. Zookeeper, Kubernetes, etc.).
+ */
+public abstract class AbstractLeaderElectionService implements 
LeaderElectionService {
+
+       protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+       protected final Object lock = new Object();

Review comment:
       IIUC, read or write all the `volatile` fields should be performed in a 
`synchronized` block protected by this lock? Then why `internalStart` also 
guarded by this lock?

##########
File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClientTest.java
##########
@@ -249,4 +265,58 @@ public void testStopAndCleanupCluster() throws Exception {
                this.flinkKubeClient.stopAndCleanupCluster(CLUSTER_ID);
                
assertTrue(this.kubeClient.apps().deployments().inNamespace(NAMESPACE).list().getItems().isEmpty());
        }
+
+       @Test
+       public void testCreateAndDeleteConfigMap() {
+               this.flinkKubeClient.createConfigMap(buildHAConfigMap());
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(true));
+               this.flinkKubeClient.deleteConfigMapsByLabels(haLabels);
+               
assertThat(this.flinkKubeClient.getConfigMap(LEADER_CONFIG_MAP_NAME).isPresent(),
 is(false));
+       }

Review comment:
       I think we should also verify that `createConfigMap` does not overwrite 
an existing config map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to