tillrohrmann commented on a change in pull request #13644:
URL: https://github.com/apache/flink/pull/13644#discussion_r510988071



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -137,8 +137,11 @@
                        operationFuture.whenCompleteAsync(
                                (t, throwable) -> {
                                        if (throwable != null) {
+                                               final Throwable 
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
                                                if (throwable instanceof 
CancellationException) {
                                                        
resultFuture.completeExceptionally(new RetryException("Operation future was 
cancelled.", throwable));
+                                               } else if (strippedThrowable 
instanceof NotRetryException) {

Review comment:
       If we do this for the `retryOperation`, then we should also do it for 
the `retryOperationWithDelay` and all others which are retrying if an exception 
occurs.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);

Review comment:
       This call could be moved into `confirmLeaderInformation`.

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +231,73 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               final String configMapName = configMap.getName();
+               return CompletableFuture.runAsync(
+                       () -> 
this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()),
+                       kubeClientExecutorService)
+                       .exceptionally(throwable -> {
+                               if (throwable != null) {
+                                       throw new CompletionException(
+                                               new KubernetesException("Failed 
to create ConfigMap " + configMapName, throwable));
+                               }
+                               return null;

Review comment:
       no need for the return statement if you get rid of the if statement.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
##########
@@ -159,6 +159,38 @@ public void testRetryCancellation() throws Exception {
                }
        }
 
+       /**
+        * Test that {@link FutureUtils#retry} should stop at {@link 
FutureUtils.NotRetryException}.
+        */
+       @Test
+       public void testStopAtNotRetryException() {
+               final int retries = 10;
+               final int notRetry = 3;
+               final AtomicInteger atomicInteger = new AtomicInteger(0);
+               final String notRetryExceptionMsg = "Do not need to retry.";
+               CompletableFuture<Boolean> retryFuture = FutureUtils.retry(
+                       () ->
+                               CompletableFuture.supplyAsync(
+                                       () -> {
+                                               if 
(atomicInteger.incrementAndGet() == notRetry) {
+                                                       throw new 
CompletionException(new FutureUtils.NotRetryException(notRetryExceptionMsg));

Review comment:
       maybe we could provide a utility `<T> T FutureUtils.stopRetry(String 
message)` which exactly does what you are doing here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderElectionDriver} to 
operate the internal state.
+        */
+       public class StateHandler {
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is granted.
+                */
+               @GuardedBy("lock")
+               public void onGrantLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       issuedLeaderSessionID = 
UUID.randomUUID();
+                                       clearConfirmedLeaderInformation();
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(
+                                                       "Grant leadership to 
contender {} with session ID {}.",
+                                                       
leaderContender.getDescription(),
+                                                       issuedLeaderSessionID);
+                                       }
+
+                                       
leaderContender.grantLeadership(issuedLeaderSessionID);
+                               } else {
+                                       LOG.debug("Ignoring the grant 
leadership notification since the service has " +
+                                               "already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is revoked.
+                */
+               @GuardedBy("lock")
+               public void onRevokeLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       LOG.debug(
+                                               "Revoke leadership of {} 
({}@{}).",
+                                               
leaderContender.getDescription(),
+                                               confirmedLeaderSessionID,
+                                               confirmedLeaderAddress);
+
+                                       issuedLeaderSessionID = null;
+                                       clearConfirmedLeaderInformation();
+
+                                       leaderContender.revokeLeadership();
+                               } else {
+                                       LOG.debug("Ignoring the revoke 
leadership notification since the service " +
+                                               "has already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when it 
wants to use leader information to do some
+                * operations. For example, correct the external storage when 
the leader information is updated exceptionally.
+                *
+                * @param consumer to specify the operation. The exception will 
be handled by leader contender.
+                */
+               @GuardedBy("lock")
+               public void runWithLock(BiConsumerWithException<UUID, String, 
Exception> consumer) {
+                       try {
+                               synchronized (lock) {
+                                       
consumer.accept(confirmedLeaderSessionID, confirmedLeaderAddress);
+                               }
+                       } catch (Exception e) {
+                               handleError(e);
+                       }
+               }

Review comment:
       Instead of providing a general purpose method to run code, maybe we 
could add a callback `onLeaderInformationChange(LeaderInformation)` which is 
called by the driver. Then the election service can take the appropriate action 
(e.g. write the leader information again if necessary).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderElectionDriver} to 
operate the internal state.
+        */
+       public class StateHandler {

Review comment:
       Maybe we could call it `LeaderEventHandler` or so.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {

Review comment:
       `@Nullable` is missing

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);

Review comment:
       We might make the `LeaderElectionDriver` a bit easier if we instantiated 
it here. Then we could save the `start` method. Of course we would need to add 
a `LeaderElectionDriverFactory`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderElectionDriver} to 
operate the internal state.
+        */
+       public class StateHandler {
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is granted.
+                */
+               @GuardedBy("lock")
+               public void onGrantLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       issuedLeaderSessionID = 
UUID.randomUUID();
+                                       clearConfirmedLeaderInformation();
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(
+                                                       "Grant leadership to 
contender {} with session ID {}.",
+                                                       
leaderContender.getDescription(),
+                                                       issuedLeaderSessionID);
+                                       }
+
+                                       
leaderContender.grantLeadership(issuedLeaderSessionID);
+                               } else {
+                                       LOG.debug("Ignoring the grant 
leadership notification since the service has " +
+                                               "already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is revoked.
+                */
+               @GuardedBy("lock")
+               public void onRevokeLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       LOG.debug(
+                                               "Revoke leadership of {} 
({}@{}).",
+                                               
leaderContender.getDescription(),
+                                               confirmedLeaderSessionID,
+                                               confirmedLeaderAddress);
+
+                                       issuedLeaderSessionID = null;
+                                       clearConfirmedLeaderInformation();
+
+                                       leaderContender.revokeLeadership();
+                               } else {
+                                       LOG.debug("Ignoring the revoke 
leadership notification since the service " +
+                                               "has already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when it 
wants to use leader information to do some
+                * operations. For example, correct the external storage when 
the leader information is updated exceptionally.
+                *
+                * @param consumer to specify the operation. The exception will 
be handled by leader contender.
+                */
+               @GuardedBy("lock")
+               public void runWithLock(BiConsumerWithException<UUID, String, 
Exception> consumer) {
+                       try {
+                               synchronized (lock) {
+                                       
consumer.accept(confirmedLeaderSessionID, confirmedLeaderAddress);
+                               }
+                       } catch (Exception e) {
+                               handleError(e);
+                       }
+               }
+
+               /**
+                * Handle error by specific {@link LeaderElectionDriver}.
+                * @param ex exception to be handled.
+                */
+               public void handleError(Exception ex) {
+                       leaderContender.handleError(ex);
+               }
+
+               /**
+                * @return the contender description.
+                */
+               public String getLeaderContenderDescription() {
+                       return leaderContender.getDescription();
+               }

Review comment:
       Do we really need this here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -38,24 +37,22 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Leader election service for multiple JobManager. The leading JobManager is 
elected using
+ * {@link LeaderElectionDriver} implemented by Zookeeper. The leading 
JobManager is elected using

Review comment:
       `{@link LeaderElectionDriver} implementation for Zookeeper.`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderElectionDriver} to 
operate the internal state.
+        */
+       public class StateHandler {

Review comment:
       For easier testability of the driver implementations, I would suggest to 
make this an interface so that one can provide a testing implementation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/LeaderRetrievalDriver.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+/**
+ * A {@link LeaderRetrievalDriver} is responsible for retrieves the current 
leader which has been elected by the
+ * {@link LeaderRetrievalDriver}.
+ */
+public interface LeaderRetrievalDriver {
+
+       /**
+        * Start the necessary services for specific {@link 
LeaderRetrievalDriver} implementations. For example, NodeCache
+        * in Zookeeper, ConfigMap watcher in Kubernetes. They could get the 
leader information update events and need to
+        * notify the leader listener by {@link 
DefaultLeaderRetrievalService.StateHandler}.
+        *
+        * @param leaderRetrievalStateHandler used for notify the leader 
changes.
+        * @throws Exception Throw exception when start the services.
+        */
+       void start(DefaultLeaderRetrievalService.StateHandler 
leaderRetrievalStateHandler) throws Exception;
+
+       /**
+        * Stop the services used for leader retrieval.
+        */
+       void stop() throws Exception;

Review comment:
       I'd say to call this method `close` instead of `stop`. We could let this 
interface also extend the `AutoCloseable` interface.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private String lastLeaderAddress;
+
+       private UUID lastLeaderSessionID;
+
+       private volatile boolean running;
+
+       /** Listener which will be notified about leader changes. */
+       private volatile LeaderRetrievalListener leaderListener;
+
+       private final LeaderRetrievalDriver leaderRetrievalDriver;
+
+       private final StateHandler leaderRetrievalStateHandler;

Review comment:
       I'd suggest to keep final fields grouped together. That way it is easier 
to see what is mutable state of a class.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private String lastLeaderAddress;
+
+       private UUID lastLeaderSessionID;
+
+       private volatile boolean running;
+
+       /** Listener which will be notified about leader changes. */
+       private volatile LeaderRetrievalListener leaderListener;
+
+       private final LeaderRetrievalDriver leaderRetrievalDriver;
+
+       private final StateHandler leaderRetrievalStateHandler;
+
+       /**
+        * Creates a default leader retrieval service with specified {@link 
LeaderRetrievalDriver}.
+        *
+        * @param leaderRetrievalDriver {@link LeaderRetrievalDriver} 
implemented by Zookeeper, Kubernetes, etc.
+        */
+       public DefaultLeaderRetrievalService(LeaderRetrievalDriver 
leaderRetrievalDriver) {
+               this.leaderRetrievalDriver = leaderRetrievalDriver;
+
+               this.lastLeaderAddress = null;
+               this.lastLeaderSessionID = null;
+
+               running = false;
+
+               this.leaderRetrievalStateHandler = new StateHandler();
+       }
+
+       @Override
+       public void start(LeaderRetrievalListener listener) throws Exception {
+               Preconditions.checkNotNull(listener, "Listener must not be 
null.");
+               Preconditions.checkState(leaderListener == null, 
"DefaultLeaderRetrievalService can " +
+                       "only be started once.");
+
+               LOG.info("Starting DefaultLeaderRetrievalService with {}.", 
leaderRetrievalDriver);
+
+               synchronized (lock) {
+                       leaderListener = listener;
+                       
leaderRetrievalDriver.start(leaderRetrievalStateHandler);
+
+                       running = true;
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+
+                       LOG.info("Stopping DefaultLeaderRetrievalService {} 
with {}.", leaderRetrievalDriver);
+
+                       leaderRetrievalDriver.stop();
+               }
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderRetrievalDriver} to 
notify the leader changes.
+        */
+       public class StateHandler {
+
+               /**
+                * Called by specific {@link LeaderRetrievalDriver} to notify 
new leader address.
+                * @param supplier supplier to provide a {@link Tuple2} of 
leader information. f0 is UUID, and f1 is leader
+                * address. The exception will be handled by leader listener.
+                */
+               @GuardedBy("lock")
+               public void 
notifyIfNewLeaderAddress(SupplierWithException<Tuple2<UUID, String>, Exception> 
supplier) {

Review comment:
       Let's call this method `notifyLeaderAddress` and let the 
`DefaultLeaderRetrievalService` be responsible for deciding whether to call the 
listener or not.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private String lastLeaderAddress;
+
+       private UUID lastLeaderSessionID;
+
+       private volatile boolean running;
+
+       /** Listener which will be notified about leader changes. */
+       private volatile LeaderRetrievalListener leaderListener;
+
+       private final LeaderRetrievalDriver leaderRetrievalDriver;
+
+       private final StateHandler leaderRetrievalStateHandler;
+
+       /**
+        * Creates a default leader retrieval service with specified {@link 
LeaderRetrievalDriver}.
+        *
+        * @param leaderRetrievalDriver {@link LeaderRetrievalDriver} 
implemented by Zookeeper, Kubernetes, etc.
+        */
+       public DefaultLeaderRetrievalService(LeaderRetrievalDriver 
leaderRetrievalDriver) {
+               this.leaderRetrievalDriver = leaderRetrievalDriver;
+
+               this.lastLeaderAddress = null;
+               this.lastLeaderSessionID = null;
+
+               running = false;
+
+               this.leaderRetrievalStateHandler = new StateHandler();
+       }
+
+       @Override
+       public void start(LeaderRetrievalListener listener) throws Exception {
+               Preconditions.checkNotNull(listener, "Listener must not be 
null.");
+               Preconditions.checkState(leaderListener == null, 
"DefaultLeaderRetrievalService can " +
+                       "only be started once.");
+
+               LOG.info("Starting DefaultLeaderRetrievalService with {}.", 
leaderRetrievalDriver);
+
+               synchronized (lock) {
+                       leaderListener = listener;
+                       
leaderRetrievalDriver.start(leaderRetrievalStateHandler);

Review comment:
       Same here, maybe we can pass a `LeaderRetrievalDriverFactory` to this 
class and then pass in the `leaderRetrievalStateHandler` when we create the 
actual `leaderRetrievalDriver`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.UUID;
+
+/**
+ * Default implementation for leader election service. Composed with different 
{@link LeaderElectionDriver}, we could
+ * perform a leader election for the contender, and then persist the leader 
information to various storage.
+ */
+public class DefaultLeaderElectionService implements LeaderElectionService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
+
+       private final Object lock = new Object();
+
+       /** The leader contender which applies for leadership. */
+       @GuardedBy("lock")
+       private volatile LeaderContender leaderContender;
+
+       @GuardedBy("lock")
+       private volatile UUID issuedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile UUID confirmedLeaderSessionID;
+
+       @GuardedBy("lock")
+       private volatile String confirmedLeaderAddress;
+
+       @GuardedBy("lock")
+       private volatile boolean running;
+
+       private final LeaderElectionDriver leaderElectionDriver;
+
+       private final StateHandler leaderElectionStateHandle;
+
+       public DefaultLeaderElectionService(LeaderElectionDriver 
leaderElectionDriver) {
+               this.leaderElectionDriver = leaderElectionDriver;
+
+               leaderContender = null;
+
+               issuedLeaderSessionID = null;
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+
+               leaderElectionStateHandle = new StateHandler();
+               running = false;
+       }
+
+       @Override
+       public final void start(LeaderContender contender) throws Exception {
+               Preconditions.checkNotNull(contender, "Contender must not be 
null.");
+               Preconditions.checkState(leaderContender == null, "Contender 
was already set.");
+
+               LOG.info("Starting LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       leaderContender = contender;
+                       running = true;
+                       leaderElectionDriver.start(leaderElectionStateHandle);
+               }
+       }
+
+       @Override
+       public final void stop() throws Exception {
+               LOG.info("Stopping LeaderElectionService {}.", this);
+
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+                       clearConfirmedLeaderInformation();
+                       leaderElectionDriver.stop();
+               }
+       }
+
+       @Override
+       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug(
+                               "Confirm leader session ID {} for leader {}.",
+                               leaderSessionID,
+                               leaderAddress);
+               }
+
+               Preconditions.checkNotNull(leaderSessionID);
+
+               synchronized (lock) {
+                       if (hasLeadership(leaderSessionID)) {
+                               if (running) {
+                                       
confirmLeaderInformation(leaderSessionID, leaderAddress);
+                                       
leaderElectionDriver.writeLeaderInformation(confirmedLeaderSessionID, 
confirmedLeaderAddress);
+                               } else {
+                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
+                                               "LeaderElectionService has 
already been stopped.", leaderSessionID);
+                               }
+                       } else {
+                               // Received an old confirmation call
+                               if 
(!leaderSessionID.equals(this.issuedLeaderSessionID)) {
+                                       LOG.debug("Receive an old confirmation 
call of leader session ID {}, " +
+                                               "current issued session ID is 
{}", leaderSessionID, issuedLeaderSessionID);
+                               } else {
+                                       LOG.warn("The leader session ID {} was 
confirmed even though the " +
+                                               "corresponding JobManager was 
not elected as the leader.", leaderSessionID);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
+               synchronized (lock) {
+                       return leaderElectionDriver.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+               }
+       }
+
+       /**
+        * Returns the current leader session ID or null, if the contender is 
not the leader.
+        *
+        * @return The last leader session ID or null, if the contender is not 
the leader
+        */
+       @VisibleForTesting
+       public UUID getLeaderSessionID() {
+               return confirmedLeaderSessionID;
+       }
+
+       @GuardedBy("lock")
+       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
+               confirmedLeaderSessionID = leaderSessionID;
+               confirmedLeaderAddress = leaderAddress;
+       }
+
+       @GuardedBy("lock")
+       private void clearConfirmedLeaderInformation() {
+               confirmedLeaderSessionID = null;
+               confirmedLeaderAddress = null;
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderElectionDriver} to 
operate the internal state.
+        */
+       public class StateHandler {
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is granted.
+                */
+               @GuardedBy("lock")
+               public void onGrantLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       issuedLeaderSessionID = 
UUID.randomUUID();
+                                       clearConfirmedLeaderInformation();
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(
+                                                       "Grant leadership to 
contender {} with session ID {}.",
+                                                       
leaderContender.getDescription(),
+                                                       issuedLeaderSessionID);
+                                       }
+
+                                       
leaderContender.grantLeadership(issuedLeaderSessionID);
+                               } else {
+                                       LOG.debug("Ignoring the grant 
leadership notification since the service has " +
+                                               "already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when the 
leadership is revoked.
+                */
+               @GuardedBy("lock")
+               public void onRevokeLeadership() {
+                       synchronized (lock) {
+                               if (running) {
+                                       LOG.debug(
+                                               "Revoke leadership of {} 
({}@{}).",
+                                               
leaderContender.getDescription(),
+                                               confirmedLeaderSessionID,
+                                               confirmedLeaderAddress);
+
+                                       issuedLeaderSessionID = null;
+                                       clearConfirmedLeaderInformation();
+
+                                       leaderContender.revokeLeadership();
+                               } else {
+                                       LOG.debug("Ignoring the revoke 
leadership notification since the service " +
+                                               "has already been stopped.");
+                               }
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderElectionDriver} when it 
wants to use leader information to do some
+                * operations. For example, correct the external storage when 
the leader information is updated exceptionally.
+                *
+                * @param consumer to specify the operation. The exception will 
be handled by leader contender.
+                */
+               @GuardedBy("lock")
+               public void runWithLock(BiConsumerWithException<UUID, String, 
Exception> consumer) {
+                       try {
+                               synchronized (lock) {
+                                       
consumer.accept(confirmedLeaderSessionID, confirmedLeaderAddress);
+                               }
+                       } catch (Exception e) {
+                               handleError(e);
+                       }
+               }
+
+               /**
+                * Handle error by specific {@link LeaderElectionDriver}.
+                * @param ex exception to be handled.
+                */
+               public void handleError(Exception ex) {
+                       leaderContender.handleError(ex);
+               }
+
+               /**
+                * @return the contender description.
+                */
+               public String getLeaderContenderDescription() {
+                       return leaderContender.getDescription();
+               }

Review comment:
       Instead of exposing this via the `StateHandler`, we could pass this also 
to the Driver's constructor in the 
`DefaultLeaderElectionService.start(LeaderContender)` method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -169,60 +171,61 @@ public static String getZooKeeperEnsemble(Configuration 
flinkConf)
        }
 
        /**
-        * Creates a {@link ZooKeeperLeaderRetrievalService} instance.
+        * Creates a {@link DefaultLeaderRetrievalService} instance with{@link 
ZookeeperLeaderRetrievalDriver}.

Review comment:
       Whitespace is missing after `with`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZookeeperLeaderRetrievalDriver.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link ZooKeeperLeaderElectionDriver}.
+ * {@link LeaderRetrievalService} implemented by Zookeeper. It retrieves the 
current leader which has

Review comment:
       `{@link LeaderRetrievalService} implementation for Zookeeper.`

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private String lastLeaderAddress;
+
+       private UUID lastLeaderSessionID;
+
+       private volatile boolean running;
+
+       /** Listener which will be notified about leader changes. */
+       private volatile LeaderRetrievalListener leaderListener;
+
+       private final LeaderRetrievalDriver leaderRetrievalDriver;
+
+       private final StateHandler leaderRetrievalStateHandler;
+
+       /**
+        * Creates a default leader retrieval service with specified {@link 
LeaderRetrievalDriver}.
+        *
+        * @param leaderRetrievalDriver {@link LeaderRetrievalDriver} 
implemented by Zookeeper, Kubernetes, etc.
+        */
+       public DefaultLeaderRetrievalService(LeaderRetrievalDriver 
leaderRetrievalDriver) {
+               this.leaderRetrievalDriver = leaderRetrievalDriver;
+
+               this.lastLeaderAddress = null;
+               this.lastLeaderSessionID = null;
+
+               running = false;
+
+               this.leaderRetrievalStateHandler = new StateHandler();
+       }
+
+       @Override
+       public void start(LeaderRetrievalListener listener) throws Exception {
+               Preconditions.checkNotNull(listener, "Listener must not be 
null.");
+               Preconditions.checkState(leaderListener == null, 
"DefaultLeaderRetrievalService can " +
+                       "only be started once.");
+
+               LOG.info("Starting DefaultLeaderRetrievalService with {}.", 
leaderRetrievalDriver);
+
+               synchronized (lock) {
+                       leaderListener = listener;
+                       
leaderRetrievalDriver.start(leaderRetrievalStateHandler);
+
+                       running = true;
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+
+                       LOG.info("Stopping DefaultLeaderRetrievalService {} 
with {}.", leaderRetrievalDriver);
+
+                       leaderRetrievalDriver.stop();
+               }
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderRetrievalDriver} to 
notify the leader changes.
+        */
+       public class StateHandler {
+
+               /**
+                * Called by specific {@link LeaderRetrievalDriver} to notify 
new leader address.
+                * @param supplier supplier to provide a {@link Tuple2} of 
leader information. f0 is UUID, and f1 is leader
+                * address. The exception will be handled by leader listener.
+                */
+               @GuardedBy("lock")
+               public void 
notifyIfNewLeaderAddress(SupplierWithException<Tuple2<UUID, String>, Exception> 
supplier) {
+                       try {
+                               final Tuple2<UUID, String> leaderInformation = 
supplier.get();
+                               final UUID newLeaderSessionID = 
leaderInformation.f0;
+                               final String newLeaderAddress = 
leaderInformation.f1;
+                               if (!(Objects.equals(newLeaderAddress, 
lastLeaderAddress) &&
+                                       Objects.equals(newLeaderSessionID, 
lastLeaderSessionID))) {
+                                       if (newLeaderAddress == null && 
newLeaderSessionID == null) {
+                                               LOG.debug("Leader information 
was lost: The listener will be notified accordingly.");
+                                       } else {
+                                               LOG.debug(
+                                                       "New leader 
information: Leader={}, session ID={}.",
+                                                       newLeaderAddress,
+                                                       newLeaderSessionID);
+                                       }
+
+                                       lastLeaderAddress = newLeaderAddress;
+                                       lastLeaderSessionID = 
newLeaderSessionID;
+                                       
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
+                               }
+                       } catch (Exception e) {
+                               leaderListener.handleError(e);
+                       }
+               }
+
+               /**
+                * Called by specific {@link LeaderRetrievalDriver} to notify 
leader loss.
+                */
+               @GuardedBy("lock")
+               public void notifyLeaderLoss() {
+                       notifyIfNewLeaderAddress(() -> new Tuple2<>(null, 
null));
+               }

Review comment:
       Isn't this call not redundant? Couldn't we use the 
`notifyLeaderAddress(LeaderAddressInformation)` which we call with 
`notifyLeaderAddress(LeaderAddressInformation.empty())` or so?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionDriver.java
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+/**
+ * A {@link LeaderElectionDriver} is responsible for performing the leader 
election and storing the leader information.
+ * All the leader internal state is guarded by lock in {@link 
DefaultLeaderElectionService}. Different driver
+ * implementations do not need to care about the lock. And it should use 
{@link DefaultLeaderElectionService.StateHandler}
+ * if it want to access the internal leader state.
+ */
+public interface LeaderElectionDriver {
+
+       /**
+        * Start the necessary services for specific {@link 
LeaderElectionService} implementations. For example, LeaderLatch
+        * and NodeCache in Zookeeper, KubernetesLeaderElector and ConfigMap 
watcher in Kubernetes. When the leader election
+        * finished, the leaderElectionStateHandler will be used to 
grant/revoke leadership.
+        *
+        * @param leaderElectionStateHandler used for operating the internal 
leader state.
+        * @throws Exception Throw exception when start the services.
+        */
+       void start(DefaultLeaderElectionService.StateHandler 
leaderElectionStateHandler) throws Exception;
+
+       /**
+        * Stop the services used for leader election.
+        */
+       void stop() throws Exception;

Review comment:
       maybe let's call it `close`. Otherwise one might think that one can 
start-stop-start the driver.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZookeeperLeaderRetrievalDriver.java
##########
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver;
+import org.apache.flink.util.FlinkException;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
+import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The counterpart to the {@link ZooKeeperLeaderElectionDriver}.
+ * {@link LeaderRetrievalService} implemented by Zookeeper. It retrieves the 
current leader which has
+ * been elected by the {@link ZooKeeperLeaderElectionDriver}.
+ * The leader address as well as the current leader session ID is retrieved 
from ZooKeeper.
+ */
+public class ZookeeperLeaderRetrievalDriver implements LeaderRetrievalDriver, 
NodeCacheListener, UnhandledErrorListener {
+       private static final Logger LOG = 
LoggerFactory.getLogger(ZookeeperLeaderRetrievalDriver.class);
+
+       /** Connection to the used ZooKeeper quorum. */
+       private final CuratorFramework client;
+
+       /** Curator recipe to watch changes of a specific ZooKeeper node. */
+       private final NodeCache cache;
+
+       private final String retrievalPath;
+
+       private volatile boolean running;
+
+       private final ConnectionStateListener connectionStateListener = 
(client, newState) -> handleStateChange(newState);
+
+       private DefaultLeaderRetrievalService.StateHandler 
leaderRetrievalStateHandler;
+
+       /**
+        * Creates a leader retrieval service which uses ZooKeeper to retrieve 
the leader information.
+        *
+        * @param client Client which constitutes the connection to the 
ZooKeeper quorum
+        * @param retrievalPath Path of the ZooKeeper node which contains the 
leader information
+        */
+       public ZookeeperLeaderRetrievalDriver(CuratorFramework client, String 
retrievalPath) {
+               this.client = checkNotNull(client, "CuratorFramework client");
+               this.cache = new NodeCache(client, retrievalPath);
+               this.retrievalPath = checkNotNull(retrievalPath);
+
+               running = false;
+       }
+
+       @Override
+       public void start(DefaultLeaderRetrievalService.StateHandler 
stateHandler) throws Exception {
+               LOG.info("Starting {}.", this);
+
+               this.leaderRetrievalStateHandler = checkNotNull(stateHandler);
+
+               client.getUnhandledErrorListenable().addListener(this);
+               cache.getListenable().addListener(this);
+               cache.start();
+
+               
client.getConnectionStateListenable().addListener(connectionStateListener);
+
+               running = true;
+       }
+
+       @Override
+       public void stop() throws Exception {
+               if (!running) {
+                       return;
+               }
+
+               running = false;
+
+               LOG.info("Stopping {}.", this);
+
+               client.getUnhandledErrorListenable().removeListener(this);
+               
client.getConnectionStateListenable().removeListener(connectionStateListener);
+
+               try {
+                       cache.close();
+               } catch (IOException e) {
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderRetrievalService.", e);
+               }
+       }
+
+       @Override
+       public void nodeChanged() {
+               if (running) {
+                       leaderRetrievalStateHandler.notifyIfNewLeaderAddress(() 
-> {
+                               LOG.debug("Leader node has changed.");
+
+                               final ChildData childData = 
cache.getCurrentData();
+
+                               final String leaderAddress;
+                               final UUID leaderSessionID;
+
+                               if (childData == null) {
+                                       leaderAddress = null;
+                                       leaderSessionID = null;
+                               } else {
+                                       byte[] data = childData.getData();
+
+                                       if (data == null || data.length == 0) {
+                                               leaderAddress = null;
+                                               leaderSessionID = null;
+                                       } else {
+                                               ByteArrayInputStream bais = new 
ByteArrayInputStream(data);
+                                               ObjectInputStream ois = new 
ObjectInputStream(bais);
+
+                                               leaderAddress = ois.readUTF();
+                                               leaderSessionID = (UUID) 
ois.readObject();
+                                       }
+                               }
+                               return new Tuple2<>(leaderSessionID, 
leaderAddress);

Review comment:
       I think the whole address extraction does not have to happen under the 
lock. I'd suggest to do it first and then simply call 
`LeaderRetrievalEventHandler.notifyLeaderAddress`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/DefaultLeaderRetrievalService.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderretrieval;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The counterpart to the {@link 
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService}.
+ * Composed with different {@link LeaderRetrievalDriver}, we could retrieve 
the leader information from
+ * different storage. The leader address as well as the current leader session 
ID will be retrieved from
+ * {@link LeaderRetrievalDriver}.
+ */
+public class DefaultLeaderRetrievalService implements LeaderRetrievalService {
+       private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderRetrievalService.class);
+
+       private final Object lock = new Object();
+
+       private String lastLeaderAddress;
+
+       private UUID lastLeaderSessionID;
+
+       private volatile boolean running;
+
+       /** Listener which will be notified about leader changes. */
+       private volatile LeaderRetrievalListener leaderListener;
+
+       private final LeaderRetrievalDriver leaderRetrievalDriver;
+
+       private final StateHandler leaderRetrievalStateHandler;
+
+       /**
+        * Creates a default leader retrieval service with specified {@link 
LeaderRetrievalDriver}.
+        *
+        * @param leaderRetrievalDriver {@link LeaderRetrievalDriver} 
implemented by Zookeeper, Kubernetes, etc.
+        */
+       public DefaultLeaderRetrievalService(LeaderRetrievalDriver 
leaderRetrievalDriver) {
+               this.leaderRetrievalDriver = leaderRetrievalDriver;
+
+               this.lastLeaderAddress = null;
+               this.lastLeaderSessionID = null;
+
+               running = false;
+
+               this.leaderRetrievalStateHandler = new StateHandler();
+       }
+
+       @Override
+       public void start(LeaderRetrievalListener listener) throws Exception {
+               Preconditions.checkNotNull(listener, "Listener must not be 
null.");
+               Preconditions.checkState(leaderListener == null, 
"DefaultLeaderRetrievalService can " +
+                       "only be started once.");
+
+               LOG.info("Starting DefaultLeaderRetrievalService with {}.", 
leaderRetrievalDriver);
+
+               synchronized (lock) {
+                       leaderListener = listener;
+                       
leaderRetrievalDriver.start(leaderRetrievalStateHandler);
+
+                       running = true;
+               }
+       }
+
+       @Override
+       public void stop() throws Exception {
+               synchronized (lock) {
+                       if (!running) {
+                               return;
+                       }
+                       running = false;
+
+                       LOG.info("Stopping DefaultLeaderRetrievalService {} 
with {}.", leaderRetrievalDriver);
+
+                       leaderRetrievalDriver.stop();
+               }
+       }
+
+       /**
+        * Helper class for the specific {@link LeaderRetrievalDriver} to 
notify the leader changes.
+        */
+       public class StateHandler {

Review comment:
       I would suggest to call it `LeaderRetrievalEventHandler` and make it an 
interface for easier testability.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
##########
@@ -70,7 +68,8 @@
 import static org.mockito.Mockito.when;
 
 /**
- * Tests for the {@link ZooKeeperLeaderElectionService} and the {@link 
ZooKeeperLeaderRetrievalService}.
+ * Tests for the {@link DefaultLeaderElectionService} with {@link 
ZooKeeperLeaderElectionDriver}
+ * and the {@link DefaultLeaderRetrievalService} with {@link 
org.apache.flink.runtime.leaderretrieval.ZookeeperLeaderRetrievalDriver}.
  */
 public class ZooKeeperLeaderElectionTest extends TestLogger {

Review comment:
       It might be possible to update some of the tests to work directly 
against the `ZooKeeperDrivers` instead of having to instantiate a the services.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionDriver.java
##########
@@ -176,168 +135,92 @@ public void stop() throws Exception{
                }
 
                if (exception != null) {
-                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderElectionService.", exception);
-               }
-       }
-
-       @Override
-       public void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug(
-                               "Confirm leader session ID {} for leader {}.",
-                               leaderSessionID,
-                               leaderAddress);
-               }
-
-               Preconditions.checkNotNull(leaderSessionID);
-
-               if (leaderLatch.hasLeadership()) {
-                       // check if this is an old confirmation call
-                       synchronized (lock) {
-                               if (running) {
-                                       if 
(leaderSessionID.equals(this.issuedLeaderSessionID)) {
-                                               
confirmLeaderInformation(leaderSessionID, leaderAddress);
-                                               writeLeaderInformation();
-                                       }
-                               } else {
-                                       LOG.debug("Ignoring the leader session 
Id {} confirmation, since the " +
-                                               "ZooKeeperLeaderElectionService 
has already been stopped.", leaderSessionID);
-                               }
-                       }
-               } else {
-                       LOG.warn("The leader session ID {} was confirmed even 
though the " +
-                                       "corresponding JobManager was not 
elected as the leader.", leaderSessionID);
+                       throw new Exception("Could not properly stop the 
ZooKeeperLeaderElectionDriver.", exception);
                }
        }
 
-       private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
-               confirmedLeaderSessionID = leaderSessionID;
-               confirmedLeaderAddress = leaderAddress;
-       }
-
        @Override
-       public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
-               return leaderLatch.hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+       public boolean hasLeadership() {
+               return leaderLatch.hasLeadership();
        }
 
        @Override
        public void isLeader() {
-               synchronized (lock) {
-                       if (running) {
-                               issuedLeaderSessionID = UUID.randomUUID();
-                               clearConfirmedLeaderInformation();
-
-                               if (LOG.isDebugEnabled()) {
-                                       LOG.debug(
-                                               "Grant leadership to contender 
{} with session ID {}.",
-                                               
leaderContender.getDescription(),
-                                               issuedLeaderSessionID);
-                               }
-
-                               
leaderContender.grantLeadership(issuedLeaderSessionID);
-                       } else {
-                               LOG.debug("Ignoring the grant leadership 
notification since the service has " +
-                                       "already been stopped.");
-                       }
-               }
-       }
-
-       private void clearConfirmedLeaderInformation() {
-               confirmedLeaderSessionID = null;
-               confirmedLeaderAddress = null;
+               leaderElectionStateHandler.onGrantLeadership();
        }
 
        @Override
        public void notLeader() {
-               synchronized (lock) {
-                       if (running) {
-                               LOG.debug(
-                                       "Revoke leadership of {} ({}@{}).",
-                                       leaderContender.getDescription(),
-                                       confirmedLeaderSessionID,
-                                       confirmedLeaderAddress);
-
-                               issuedLeaderSessionID = null;
-                               clearConfirmedLeaderInformation();
-
-                               leaderContender.revokeLeadership();
-                       } else {
-                               LOG.debug("Ignoring the revoke leadership 
notification since the service " +
-                                       "has already been stopped.");
-                       }
-               }
+               leaderElectionStateHandler.onRevokeLeadership();
        }
 
        @Override
-       public void nodeChanged() throws Exception {
-               try {
+       public void nodeChanged() {
+               final String leaderContenderDesc = 
leaderElectionStateHandler.getLeaderContenderDescription();
+               
leaderElectionStateHandler.runWithLock((confirmedLeaderSessionID, 
confirmedLeaderAddress) -> {
                        // leaderSessionID is null if the leader contender has 
not yet confirmed the session ID
                        if (leaderLatch.hasLeadership()) {
-                               synchronized (lock) {
-                                       if (running) {
-                                               if (LOG.isDebugEnabled()) {
-                                                       LOG.debug(
-                                                               "Leader node 
changed while {} is the leader with session ID {}.",
-                                                               
leaderContender.getDescription(),
-                                                               
confirmedLeaderSessionID);
-                                               }
+                               if (running) {
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug(
+                                                       "Leader node changed 
while {} is the leader with session ID {}.",
+                                                       leaderContenderDesc,
+                                                       
confirmedLeaderSessionID);
+                                       }
+
+                                       if (confirmedLeaderSessionID != null) {
+                                               ChildData childData = 
cache.getCurrentData();
 
-                                               if (confirmedLeaderSessionID != 
null) {
-                                                       ChildData childData = 
cache.getCurrentData();
+                                               if (childData == null) {
+                                                       if 
(LOG.isDebugEnabled()) {
+                                                               LOG.debug(
+                                                                       
"Writing leader information into empty node by {}.",
+                                                                       
leaderContenderDesc);
+                                                       }
+                                                       
writeLeaderInformation(confirmedLeaderSessionID, confirmedLeaderAddress);
+                                               } else {
+                                                       byte[] data = 
childData.getData();
 
-                                                       if (childData == null) {
+                                                       if (data == null || 
data.length == 0) {
+                                                               // the data 
field seems to be empty, rewrite information
                                                                if 
(LOG.isDebugEnabled()) {
                                                                        
LOG.debug(
-                                                                               
"Writing leader information into empty node by {}.",
-                                                                               
leaderContender.getDescription());
+                                                                               
"Writing leader information into node with empty data field by {}.",
+                                                                               
leaderContenderDesc);
                                                                }
-                                                               
writeLeaderInformation();
+                                                               
writeLeaderInformation(confirmedLeaderSessionID, confirmedLeaderAddress);
                                                        } else {
-                                                               byte[] data = 
childData.getData();
+                                                               
ByteArrayInputStream bais = new ByteArrayInputStream(data);
+                                                               
ObjectInputStream ois = new ObjectInputStream(bais);
 
-                                                               if (data == 
null || data.length == 0) {
-                                                                       // the 
data field seems to be empty, rewrite information
+                                                               String 
leaderAddress = ois.readUTF();
+                                                               UUID 
leaderSessionID = (UUID) ois.readObject();
+
+                                                               if 
(!leaderAddress.equals(confirmedLeaderAddress) ||
+                                                                       
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) 
{
+                                                                       // the 
data field does not correspond to the expected leader information
                                                                        if 
(LOG.isDebugEnabled()) {
                                                                                
LOG.debug(
-                                                                               
        "Writing leader information into node with empty data field by {}.",
-                                                                               
        leaderContender.getDescription());
-                                                                       }
-                                                                       
writeLeaderInformation();
-                                                               } else {
-                                                                       
ByteArrayInputStream bais = new ByteArrayInputStream(data);
-                                                                       
ObjectInputStream ois = new ObjectInputStream(bais);
-
-                                                                       String 
leaderAddress = ois.readUTF();
-                                                                       UUID 
leaderSessionID = (UUID) ois.readObject();
-
-                                                                       if 
(!leaderAddress.equals(confirmedLeaderAddress) ||
-                                                                               
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) 
{
-                                                                               
// the data field does not correspond to the expected leader information
-                                                                               
if (LOG.isDebugEnabled()) {
-                                                                               
        LOG.debug(
-                                                                               
                "Correcting leader information by {}.",
-                                                                               
                leaderContender.getDescription());
-                                                                               
}
-                                                                               
writeLeaderInformation();
+                                                                               
        "Correcting leader information by {}.",
+                                                                               
        leaderContenderDesc);

Review comment:
       I think it would be good to let the driver extract the leader 
information and then let the service decide how to handle it (e.g. whether it 
needs to rewrite the leader information or not).

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +231,73 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               final String configMapName = configMap.getName();
+               return CompletableFuture.runAsync(
+                       () -> 
this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()),
+                       kubeClientExecutorService)
+                       .exceptionally(throwable -> {
+                               if (throwable != null) {
+                                       throw new CompletionException(
+                                               new KubernetesException("Failed 
to create ConfigMap " + configMapName, throwable));
+                               }
+                               return null;
+                       });
+       }
+
+       @Override
+       public Optional<KubernetesConfigMap> getConfigMap(String name) {
+               final ConfigMap configMap = 
this.internalClient.configMaps().inNamespace(namespace).withName(name).get();
+               return configMap == null ? Optional.empty() : Optional.of(new 
KubernetesConfigMap(configMap));
+       }
+
+       @Override
+       public CompletableFuture<Boolean> checkAndUpdateConfigMap(
+                       String configMapName,
+                       FunctionWithException<KubernetesConfigMap, 
Optional<KubernetesConfigMap>, ?> function) {

Review comment:
       I am not yet fully convinced that `FunctionWithException` is the right 
choice. For the sake of completeness I repeat what I have written on an older 
discussion: I think we should only do `discardState` or something similar after 
we are sure that we have updated the `configMap` properly (e.g. removed the 
entry).

##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
##########
@@ -219,6 +231,73 @@ public KubernetesWatch watchPodsAndDoCallback(
                                .watch(new 
KubernetesPodsWatcher(podCallbackHandler)));
        }
 
+       @Override
+       public CompletableFuture<Void> createConfigMap(KubernetesConfigMap 
configMap) {
+               final String configMapName = configMap.getName();
+               return CompletableFuture.runAsync(
+                       () -> 
this.internalClient.configMaps().inNamespace(namespace).create(configMap.getInternalResource()),
+                       kubeClientExecutorService)
+                       .exceptionally(throwable -> {
+                               if (throwable != null) {

Review comment:
       `throwable` will be non null so there is no check needed.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
##########
@@ -412,6 +415,26 @@ public RetryException(Throwable cause) {
                }
        }
 
+       /**
+        * The {@link #retry(Supplier, int, Executor)} attempts will stop at 
this exception.
+        */
+       public static class NotRetryException extends Exception {

Review comment:
       Maybe `StopRetryException` could be a slightly better name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to