This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4576e4384ff36623712043564039f654c3b44a30
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Feb 3 16:49:51 2023 +0100

    [FLINK-31773][runtime] Splits up 
DefaultLeaderElectionService.start(LeaderContender)
    
    This change adds a clear separation of the driver lifecycle and the 
LeaderContender
    registration to DefaultLeaderElectionService.
    
    Signed-off-by: Matthias Pohl <[email protected]>
---
 .../highavailability/AbstractHaServices.java       |  32 +-
 .../highavailability/HighAvailabilityServices.java |   8 +-
 .../DefaultLeaderElectionService.java              | 386 ++++++++++++++-------
 .../leaderelection/LeaderElectionService.java      |   9 +-
 .../leaderelection/LeaderElectionUtils.java        |  45 +++
 .../ResourceManagerServiceImpl.java                |   6 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |  12 +-
 .../JobMasterServiceLeadershipRunnerTest.java      |  13 +-
 .../DefaultLeaderElectionServiceTest.java          | 239 ++++++++++++-
 .../runtime/leaderelection/LeaderElectionTest.java |   2 +-
 .../TestingGenericLeaderContender.java             |   5 +-
 .../runtime/leaderelection/TestingLeaderBase.java  |  22 ++
 .../TestingLeaderElectionDriver.java               |  23 +-
 ...KeeperLeaderElectionConnectionHandlingTest.java |   2 +
 .../ZooKeeperLeaderElectionTest.java               |   5 +
 .../ZooKeeperLeaderRetrievalTest.java              |  30 +-
 16 files changed, 660 insertions(+), 179 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
index feb8c2f0e87..2fd1e271c53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/AbstractHaServices.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.AutoCloseableRegistry;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.blob.BlobStoreService;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
@@ -68,6 +69,8 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
 
     private final JobResultStore jobResultStore;
 
+    private final AutoCloseableRegistry closeableRegistry = new 
AutoCloseableRegistry();
+
     protected AbstractHaServices(
             Configuration config,
             Executor ioExecutor,
@@ -107,22 +110,22 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
     }
 
     @Override
-    public LeaderElectionService getResourceManagerLeaderElectionService() {
+    public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
         return createLeaderElectionService(getLeaderPathForResourceManager());
     }
 
     @Override
-    public LeaderElectionService getDispatcherLeaderElectionService() {
+    public LeaderElectionService getDispatcherLeaderElectionService() throws 
Exception {
         return createLeaderElectionService(getLeaderPathForDispatcher());
     }
 
     @Override
-    public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) {
+    public LeaderElectionService getJobManagerLeaderElectionService(JobID 
jobID) throws Exception {
         return createLeaderElectionService(getLeaderPathForJobManager(jobID));
     }
 
     @Override
-    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
{
+    public LeaderElectionService getClusterRestEndpointLeaderElectionService() 
throws Exception {
         return createLeaderElectionService(getLeaderPathForRestServer());
     }
 
@@ -156,6 +159,12 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
             exception = t;
         }
 
+        try {
+            closeableRegistry.close();
+        } catch (Throwable t) {
+            exception = ExceptionUtils.firstOrSuppressed(t, exception);
+        }
+
         try {
             internalClose();
         } catch (Throwable t) {
@@ -183,6 +192,12 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
             exception = t;
         }
 
+        try {
+            closeableRegistry.close();
+        } catch (Throwable t) {
+            exception = ExceptionUtils.firstOrSuppressed(t, exception);
+        }
+
         try {
             internalClose();
         } catch (Throwable t) {
@@ -225,8 +240,13 @@ public abstract class AbstractHaServices implements 
HighAvailabilityServices {
                 executor);
     }
 
-    private LeaderElectionService createLeaderElectionService(String 
leaderName) {
-        return new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName));
+    private LeaderElectionService createLeaderElectionService(String 
leaderName) throws Exception {
+        final DefaultLeaderElectionService leaderElectionService =
+                new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(leaderName));
+        leaderElectionService.startLeaderElectionBackend();
+
+        closeableRegistry.registerCloseable(leaderElectionService);
+        return leaderElectionService;
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 3e83c2b6f42..4d491315a43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -125,14 +125,14 @@ public interface HighAvailabilityServices
      *
      * @return Leader election service for the resource manager leader election
      */
-    LeaderElectionService getResourceManagerLeaderElectionService();
+    LeaderElectionService getResourceManagerLeaderElectionService() throws 
Exception;
 
     /**
      * Gets the leader election service for the cluster's dispatcher.
      *
      * @return Leader election service for the dispatcher leader election
      */
-    LeaderElectionService getDispatcherLeaderElectionService();
+    LeaderElectionService getDispatcherLeaderElectionService() throws 
Exception;
 
     /**
      * Gets the leader election service for the given job.
@@ -140,7 +140,7 @@ public interface HighAvailabilityServices
      * @param jobID The identifier of the job running the election.
      * @return Leader election service for the job manager leader election
      */
-    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID);
+    LeaderElectionService getJobManagerLeaderElectionService(JobID jobID) 
throws Exception;
 
     /**
      * Gets the leader election service for the cluster's rest endpoint.
@@ -194,7 +194,7 @@ public interface HighAvailabilityServices
      *
      * @return the leader election service used by the cluster's rest endpoint
      */
-    default LeaderElectionService 
getClusterRestEndpointLeaderElectionService() {
+    default LeaderElectionService 
getClusterRestEndpointLeaderElectionService() throws Exception {
         // for backwards compatibility we delegate to 
getWebMonitorLeaderElectionService
         // all implementations of this interface should override
         // getClusterRestEndpointLeaderElectionService, though
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index f75896a6967..b124127022e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -32,11 +31,11 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -44,9 +43,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
@@ -54,26 +55,49 @@ public class DefaultLeaderElectionService
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} if and only if the
+     * leadership is acquired by this service. {@code issuedLeaderSessionID} 
being {@code null}
+     * indicates that this service isn't the leader right now (i.e. {@link
+     * #onGrantLeadership(UUID)}) wasn't called, yet (independently of what 
{@code
+     * leaderElectionDriver#hasLeadership()} returns).
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} 
after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the 
connection to the
+     * LeaderElection backend isn't established, yet. See {@link 
#startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver 
should have been
+     * established before registering a {@link LeaderContender} and stopped 
after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
     private LeaderElectionDriver leaderElectionDriver;
 
+    @GuardedBy("lock")
     private final ExecutorService leadershipOperationExecutor;
 
     public DefaultLeaderElectionService(LeaderElectionDriverFactory 
leaderElectionDriverFactory) {
@@ -98,23 +122,52 @@ public class DefaultLeaderElectionService
 
         this.confirmedLeaderInformation = LeaderInformation.empty();
 
-        this.running = false;
+        this.leadershipOperationExecutor = 
Preconditions.checkNotNull(leadershipOperationExecutor);
+    }
+
+    /**
+     * Starts the leader election process. This method has to be called before 
registering a {@link
+     * LeaderContender}. This method could be moved into the {@code 
DefaultLeaderElectionService}'s
+     * constructor with FLINK-31837.
+     */
+    public void startLeaderElectionBackend() throws Exception {
+        synchronized (lock) {
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "No LeaderContender should have been registered, yet.");
+
+            leaderElectionDriver =
+                    leaderElectionDriverFactory.createLeaderElectionDriver(
+                            this, new LeaderElectionFatalErrorHandler());
 
-        this.leadershipOperationExecutor = leadershipOperationExecutor;
+            LOG.info("Instantiating DefaultLeaderElectionService with {}.", 
leaderElectionDriver);
+        }
     }
 
     @Override
     public final void start(LeaderContender contender) throws Exception {
         checkNotNull(contender, "Contender must not be null.");
-        Preconditions.checkState(leaderContender == null, "Contender was 
already set.");
 
         synchronized (lock) {
-            running = true;
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "Only one LeaderContender is allowed to be registered to 
this service.");
+            Preconditions.checkState(
+                    leaderElectionDriver != null,
+                    "The DefaultLeaderElectionService should have established 
a connection to the backend before it's started.");
+
             leaderContender = contender;
-            leaderElectionDriver =
-                    leaderElectionDriverFactory.createLeaderElectionDriver(
-                            this, new LeaderElectionFatalErrorHandler());
-            LOG.info("Starting DefaultLeaderElectionService with {}.", 
leaderElectionDriver);
+
+            LOG.info(
+                    "LeaderContender {} has been registered for {}.",
+                    contender.getDescription(),
+                    leaderElectionDriver);
+
+            if (issuedLeaderSessionID != null) {
+                // notifying the LeaderContender shouldn't happen in the 
contender's main thread
+                runInLeaderEventThread(
+                        () -> 
notifyLeaderContenderOfLeadership(issuedLeaderSessionID));
+            }
         }
     }
 
@@ -123,27 +176,64 @@ public class DefaultLeaderElectionService
         LOG.info("Stopping DefaultLeaderElectionService.");
 
         synchronized (lock) {
-            if (!running) {
+            if (leaderContender == null) {
                 LOG.debug(
                         "The stop procedure was called on an already stopped 
DefaultLeaderElectionService instance. No action necessary.");
                 return;
             }
-            running = false;
 
-            if (leaderElectionDriver.hasLeadership()) {
-                handleLeadershipLoss();
-                
leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());
+            if (issuedLeaderSessionID != null) {
+                notifyLeaderContenderOfLeadershipLoss();
+                LOG.debug(
+                        "DefaultLeaderElectionService is stopping while having 
the leadership acquired. The revoke event is forwarded to the 
LeaderContender.");
+
+                if (leaderElectionDriver.hasLeadership()) {
+                    
leaderElectionDriver.writeLeaderInformation(LeaderInformation.empty());
+                    LOG.debug("Leader information is cleaned up while 
stopping.");
+                }
             } else {
+                Preconditions.checkState(
+                        confirmedLeaderInformation.isEmpty(),
+                        "The confirmed leader information should have been 
cleared.");
+
                 LOG.debug(
                         "DefaultLeaderElectionService is stopping while not 
having the leadership acquired. No cleanup necessary.");
             }
-        }
 
-        leaderElectionDriver.close();
+            leaderContender = null;
+        }
+    }
 
-        // graceful shutdown needs to happen outside the lock to enable any 
outstanding
-        // grant/revoke events to be processed without the lock being acquired 
by the service
-        ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
leadershipOperationExecutor);
+    @Override
+    public void close() throws Exception {
+        synchronized (lock) {
+            Preconditions.checkState(
+                    leaderContender == null,
+                    "The DefaultLeaderElectionService should have been stopped 
before closing the instance.");
+
+            issuedLeaderSessionID = null;
+
+            if (leaderElectionDriver != null) {
+                leaderElectionDriver.close();
+                leaderElectionDriver = null;
+
+                // The shutdown of the thread pool needs to be done forcefully 
because we want its
+                // lifecycle being coupled to the driver (which require it to 
be shut down within
+                // the lock) to allow null checks in runInLeaderEventThread 
method. The outstanding
+                // event handling callbacks are going to be ignored, anyway.
+                final List<Runnable> outstandingEventHandlingCalls =
+                        
Preconditions.checkNotNull(leadershipOperationExecutor).shutdownNow();
+                if (!outstandingEventHandlingCalls.isEmpty()) {
+                    LOG.debug(
+                            "The DefaultLeaderElectionService was closed with 
{} still not being processed. No further action necessary.",
+                            outstandingEventHandlingCalls.size() == 1
+                                    ? "one event"
+                                    : (outstandingEventHandlingCalls.size() + 
" events"));
+                }
+            } else {
+                LOG.debug("The HA backend connection isn't established. No 
actions taken.");
+            }
+        }
     }
 
     @Override
@@ -154,15 +244,14 @@ public class DefaultLeaderElectionService
 
         synchronized (lock) {
             if (hasLeadership(leaderSessionID)) {
-                if (running) {
-                    confirmLeaderInformation(leaderSessionID, leaderAddress);
-                } else {
-                    LOG.debug(
-                            "Ignoring the leader session Id {} confirmation, 
since the LeaderElectionService has already been stopped.",
-                            leaderSessionID);
-                }
+                Preconditions.checkState(
+                        confirmedLeaderInformation.isEmpty(),
+                        "No confirmation should have happened, yet.");
+
+                confirmedLeaderInformation =
+                        LeaderInformation.known(leaderSessionID, 
leaderAddress);
+                
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
             } else {
-                // Received an old confirmation call
                 if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
                     LOG.debug(
                             "Receive an old confirmation call of leader 
session ID {}, current issued session ID is {}",
@@ -171,7 +260,7 @@ public class DefaultLeaderElectionService
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even 
though the "
-                                    + "corresponding JobManager was not 
elected as the leader.",
+                                    + "corresponding service was not elected 
as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
@@ -181,31 +270,29 @@ public class DefaultLeaderElectionService
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return leaderElectionDriver.hasLeadership()
+                            && leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the service is 
stopped, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is 
stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is 
closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not 
the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the 
leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session 
wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, 
leaderAddress);
-        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
@@ -213,24 +300,45 @@ public class DefaultLeaderElectionService
         runInLeaderEventThread(() -> 
onGrantLeadershipInternal(newLeaderSessionId));
     }
 
+    @GuardedBy("lock")
     private void onGrantLeadershipInternal(UUID newLeaderSessionId) {
-        synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+        Preconditions.checkNotNull(newLeaderSessionId);
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+        Preconditions.checkState(
+                issuedLeaderSessionID == null,
+                "The leadership should have been granted while not having the 
leadership acquired.");
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
-            } else {
-                LOG.debug(
-                        "Ignoring the grant leadership notification since the 
{} has already been closed.",
-                        leaderElectionDriver);
-            }
+        issuedLeaderSessionID = newLeaderSessionId;
+
+        notifyLeaderContenderOfLeadership(issuedLeaderSessionID);
+    }
+
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership(UUID sessionID) {
+        if (leaderContender == null) {
+            LOG.debug(
+                    "The grant leadership notification for session ID {} is 
not forwarded because the DefaultLeaderElectionService ({}) has no contender 
registered.",
+                    sessionID,
+                    leaderElectionDriver);
+            return;
+        } else if (!sessionID.equals(issuedLeaderSessionID)) {
+            LOG.debug(
+                    "An out-dated leadership-acquired event with session ID {} 
was triggered. The current leader session ID is {}. The event will be ignored.",
+                    sessionID,
+                    issuedLeaderSessionID);
+            return;
         }
+
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the 
leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
     }
 
     @Override
@@ -238,30 +346,42 @@ public class DefaultLeaderElectionService
         runInLeaderEventThread(this::onRevokeLeadershipInternal);
     }
 
+    @GuardedBy("lock")
     private void onRevokeLeadershipInternal() {
-        synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
-            } else {
-                LOG.debug(
-                        "Ignoring the revoke leadership notification since the 
{} "
-                                + "has already been closed.",
-                        leaderElectionDriver);
-            }
+        // TODO: FLINK-31814 covers adding this Precondition
+        // Preconditions.checkState(issuedLeaderSessionID != null,"The 
leadership should have
+        // been revoked while having the leadership acquired.");
+
+        if (leaderContender != null) {
+            notifyLeaderContenderOfLeadershipLoss();
+        } else {
+            LOG.debug(
+                    "The revoke leadership for session {} notification is not 
forwarded because the DefaultLeaderElectionService({}) has no contender 
registered.",
+                    issuedLeaderSessionID,
+                    leaderElectionDriver);
         }
+
+        issuedLeaderSessionID = null;
     }
 
     @GuardedBy("lock")
-    private void handleLeadershipLoss() {
-        LOG.debug(
-                "Revoke leadership of {} ({}@{}).",
-                leaderContender.getDescription(),
-                confirmedLeaderInformation.getLeaderSessionID(),
-                confirmedLeaderInformation.getLeaderAddress());
+    private void notifyLeaderContenderOfLeadershipLoss() {
+        Preconditions.checkState(
+                leaderContender != null,
+                "The LeaderContender should be always set when calling this 
method.");
+
+        if (confirmedLeaderInformation.isEmpty()) {
+            LOG.debug(
+                    "Revoking leadership to contender {} while a previous 
leadership grant wasn't confirmed, yet.",
+                    leaderContender.getDescription());
+        } else {
+            LOG.debug(
+                    "Revoking leadership to contender {} for {}.",
+                    leaderContender.getDescription(),
+                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+        }
 
-        issuedLeaderSessionID = null;
         confirmedLeaderInformation = LeaderInformation.empty();
-
         leaderContender.revokeLeadership();
     }
 
@@ -270,50 +390,67 @@ public class DefaultLeaderElectionService
         runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
     }
 
+    @GuardedBy("lock")
     private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
-        synchronized (lock) {
-            if (running) {
-                LOG.trace(
-                        "Leader node changed while {} is the leader with 
session ID {}. New leader information {}.",
-                        leaderContender.getDescription(),
-                        confirmedLeaderInformation.getLeaderSessionID(),
-                        leaderInformation);
-                if (!confirmedLeaderInformation.isEmpty()) {
-                    final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
-                    if (leaderInformation.isEmpty()) {
-                        LOG.debug(
-                                "Writing leader information by {} since the 
external storage is empty.",
-                                leaderContender.getDescription());
-                        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    } else if (!leaderInformation.equals(confirmedLeaderInfo)) 
{
-                        // the data field does not correspond to the expected 
leader information
-                        LOG.debug(
-                                "Correcting leader information by {}.",
-                                leaderContender.getDescription());
-                        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
-                    }
+        if (leaderContender != null) {
+            LOG.trace(
+                    "Leader node changed while {} is the leader with {}. New 
leader information {}.",
+                    leaderContender.getDescription(),
+                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
+                    LeaderElectionUtils.convertToString(leaderInformation));
+            if (!confirmedLeaderInformation.isEmpty()) {
+                final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
+                if (leaderInformation.isEmpty()) {
+                    LOG.debug(
+                            "Writing leader information by {} since the 
external storage is empty.",
+                            leaderContender.getDescription());
+                    
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
+                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
+                    // the data field does not correspond to the expected 
leader information
+                    LOG.debug(
+                            "Correcting leader information by {}.",
+                            leaderContender.getDescription());
+                    
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInfo);
                 }
-            } else {
-                LOG.debug(
-                        "Ignoring change notification since the {} has already 
been closed.",
-                        leaderElectionDriver);
             }
+        } else {
+            LOG.debug(
+                    "Ignoring change notification since the {} has already 
been stopped.",
+                    leaderElectionDriver);
         }
     }
 
     private void runInLeaderEventThread(Runnable callback) {
-        if (running) {
-            FutureUtils.handleUncaughtException(
-                    CompletableFuture.runAsync(callback, 
leadershipOperationExecutor),
-                    (thread, error) -> forwardErrorToLeaderContender(error));
+        synchronized (lock) {
+            if (!leadershipOperationExecutor.isShutdown()) {
+                FutureUtils.handleUncaughtException(
+                        CompletableFuture.runAsync(
+                                () -> {
+                                    synchronized (lock) {
+                                        callback.run();
+                                    }
+                                },
+                                leadershipOperationExecutor),
+                        (thread, error) -> 
forwardErrorToLeaderContender(error));
+            } else {
+                LOG.debug(
+                        "Leader event handling was triggered after the 
DefaultLeaderElectionService is closed. The event will be ignored.");
+            }
         }
     }
 
     private void forwardErrorToLeaderContender(Throwable t) {
-        if (t instanceof LeaderElectionException) {
-            leaderContender.handleError((LeaderElectionException) t);
-        } else {
-            leaderContender.handleError(new LeaderElectionException(t));
+        synchronized (lock) {
+            if (leaderContender == null) {
+                LOG.debug("Ignoring error notification since there's no 
contender registered.");
+                return;
+            }
+
+            if (t instanceof LeaderElectionException) {
+                leaderContender.handleError((LeaderElectionException) t);
+            } else {
+                leaderContender.handleError(new LeaderElectionException(t));
+            }
         }
     }
 
@@ -321,14 +458,7 @@ public class DefaultLeaderElectionService
 
         @Override
         public void onFatalError(Throwable throwable) {
-            synchronized (lock) {
-                if (!running) {
-                    LOG.debug("Ignoring error notification since the service 
has been stopped.");
-                    return;
-                }
-
-                forwardErrorToLeaderContender(throwable);
-            }
+            forwardErrorToLeaderContender(throwable);
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index a0c1c5968ed..97f1ad1966a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -37,16 +37,17 @@ import java.util.UUID;
 public interface LeaderElectionService {
 
     /**
-     * Starts the leader election service. This method can only be called once.
+     * Registers the passed {@link LeaderContender} with this {@code 
LeaderElectionService}. This
+     * method can only be called once.
      *
      * @param contender LeaderContender which applies for the leadership
-     * @throws Exception
      */
     void start(LeaderContender contender) throws Exception;
 
     /**
-     * Stops the leader election service. Stopping the {@code 
LeaderElectionService} will trigger
-     * {@link LeaderContender#revokeLeadership()} if the service still holds 
the leadership.
+     * Ends the participation of the registered {@link LeaderContender} in the 
leader election
+     * process. This will trigger {@link LeaderContender#revokeLeadership()} 
if the service still
+     * holds the leadership.
      *
      * @throws Exception if an error occurs while stopping the {@code 
LeaderElectionService}.
      */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java
new file mode 100644
index 00000000000..688a95c4155
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+
+/** {@code LeaderElectionUtils} collects helper methods to handle 
LeaderElection-related issues. */
+public class LeaderElectionUtils {
+
+    /**
+     * Converts the passed {@link LeaderInformation} into a human-readable 
representation that can
+     * be used in log messages.
+     */
+    public static String convertToString(LeaderInformation leaderInformation) {
+        return leaderInformation.isEmpty()
+                ? "<no leader>"
+                : convertToString(
+                        leaderInformation.getLeaderSessionID(),
+                        leaderInformation.getLeaderAddress());
+    }
+
+    public static String convertToString(UUID sessionId, String address) {
+        return String.format(
+                "%s@%s",
+                Preconditions.checkNotNull(sessionId), 
Preconditions.checkNotNull(address));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index e50b1b9c669..f3f75bd31b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
-import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -82,7 +81,8 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
 
     private ResourceManagerServiceImpl(
             ResourceManagerFactory<?> resourceManagerFactory,
-            ResourceManagerProcessContext rmProcessContext) {
+            ResourceManagerProcessContext rmProcessContext)
+            throws Exception {
         this.resourceManagerFactory = checkNotNull(resourceManagerFactory);
         this.rmProcessContext = checkNotNull(rmProcessContext);
 
@@ -354,7 +354,7 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
             MetricRegistry metricRegistry,
             String hostname,
             Executor ioExecutor)
-            throws ConfigurationException {
+            throws Exception {
 
         return new ResourceManagerServiceImpl(
                 resourceManagerFactory,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index 26fc1a83384..d1416f8356b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -403,8 +403,8 @@ public class ZooKeeperUtils {
      * @param client The {@link CuratorFramework} ZooKeeper client to use
      * @return {@link DefaultLeaderElectionService} instance.
      */
-    public static DefaultLeaderElectionService createLeaderElectionService(
-            CuratorFramework client) {
+    public static DefaultLeaderElectionService 
createLeaderElectionService(CuratorFramework client)
+            throws Exception {
 
         return createLeaderElectionService(client, "");
     }
@@ -418,8 +418,12 @@ public class ZooKeeperUtils {
      * @return {@link DefaultLeaderElectionService} instance.
      */
     public static DefaultLeaderElectionService createLeaderElectionService(
-            final CuratorFramework client, final String path) {
-        return new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
+            final CuratorFramework client, final String path) throws Exception 
{
+        final DefaultLeaderElectionService leaderElectionService =
+                new 
DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
+        leaderElectionService.startLeaderElectionBackend();
+
+        return leaderElectionService;
     }
 
     /**
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
index 6e4d9cb0fa6..6519817877c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java
@@ -680,8 +680,9 @@ class JobMasterServiceLeadershipRunnerTest {
 
         // we need to use DefaultLeaderElectionService here because 
JobMasterServiceLeadershipRunner
         // in connection with the DefaultLeaderElectionService generates the 
nested locking
-        final LeaderElectionService defaultLeaderElectionService =
+        final DefaultLeaderElectionService defaultLeaderElectionService =
                 new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+        defaultLeaderElectionService.startLeaderElectionBackend();
 
         // latch to detect when we reached the first synchronized section 
having a lock on the
         // JobMasterServiceProcess#stop side
@@ -772,7 +773,15 @@ class JobMasterServiceLeadershipRunnerTest {
             closeAsyncCalledTrigger.await();
 
             final CheckedThread grantLeadershipThread =
-                    createCheckedThread(currentLeaderDriver::isLeader);
+                    createCheckedThread(
+                            () -> {
+                                // DefaultLeaderElectionService enforces a 
proper event handling
+                                // order (i.e. no two grant or revoke events 
should appear after
+                                // each other). This requires the leadership 
to be revoked before
+                                // regaining leadership in this test.
+                                currentLeaderDriver.notLeader();
+                                currentLeaderDriver.isLeader();
+                            });
             grantLeadershipThread.start();
 
             // finalize ClassloaderLease release to trigger 
DefaultLeaderElectionService#stop
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index 17bc0787d22..8025063b67a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -35,6 +35,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link DefaultLeaderElectionService}. */
 class DefaultLeaderElectionServiceTest {
@@ -77,6 +78,170 @@ class DefaultLeaderElectionServiceTest {
         };
     }
 
+    /**
+     * With {@link MultipleComponentLeaderElectionDriverAdapter} and {@link
+     * DefaultMultipleComponentLeaderElectionService} it happens that {@link
+     * LeaderElectionEventHandler#onGrantLeadership(UUID)} happens while 
instantiating the {@link
+     * LeaderElectionDriver} (i.e. {@code 
MultipleComponentLeaderElectionDriverAdapter}). This test
+     * verifies that the grant event is handled properly.
+     */
+    @Test
+    void testGrantCallWhileInstantiatingDriver() throws Exception {
+        final UUID expectedLeaderSessionID = UUID.randomUUID();
+        try (final TestingGenericLeaderElectionDriver driver =
+                        
TestingGenericLeaderElectionDriver.newBuilder().build();
+                final DefaultLeaderElectionService testInstance =
+                        new DefaultLeaderElectionService(
+                                (eventHandler, errorHandler) -> {
+                                    
eventHandler.onGrantLeadership(expectedLeaderSessionID);
+                                    return driver;
+                                },
+                                Executors.newDirectExecutorService())) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingContender testingContender =
+                    new TestingContender("unused-address", testInstance);
+            testInstance.start(testingContender);
+
+            
assertThat(testingContender.getLeaderSessionID()).isEqualTo(expectedLeaderSessionID);
+
+            testInstance.stop();
+        }
+    }
+
+    @Test
+    void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            // we need to stop to deregister the contender 
that was already
+                            // registered to the service
+                            leaderElectionService.stop();
+
+                            final UUID expectedSessionID = UUID.randomUUID();
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+
+                            leaderElectionService.start(testingContender);
+
+                            assertThat(testingContender.getLeaderSessionID())
+                                    .as("Leadership grant was not forwarded to 
the contender, yet.")
+                                    .isNull();
+
+                            executorService.trigger();
+
+                            assertThat(testingContender.getLeaderSessionID())
+                                    .as("Leadership grant is actually 
forwarded to the service.")
+                                    .isEqualTo(expectedSessionID);
+
+                            testingContender.waitForLeader();
+                        });
+            }
+        };
+    }
+
+    @Test
+    void testDelayedGrantCallAfterContenderBeingDeregisteredAgain() throws 
Exception {
+        new Context() {
+            {
+                runTestWithManuallyTriggeredEvents(
+                        executorService -> {
+                            // we need to stop to deregister the contender 
that was already
+                            // registered to the service
+                            leaderElectionService.stop();
+
+                            final UUID expectedSessionID = UUID.randomUUID();
+                            
testingLeaderElectionDriver.isLeader(expectedSessionID);
+                            executorService.trigger();
+
+                            leaderElectionService.start(testingContender);
+
+                            leaderElectionService.stop();
+
+                            executorService.trigger();
+                        });
+            }
+        };
+    }
+
+    /**
+     * Test to cover the issue described in FLINK-31814. This test could be 
removed after
+     * FLINK-31814 is resolved.
+     */
+    @Test
+    void testOnRevokeCallWhileClosingService() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+                        LeaderElectionEventHandler::onRevokeLeadership);
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            final TestingContender contender = new 
TestingContender("unused-address", testInstance);
+            testInstance.start(contender);
+
+            contender.waitForLeader();
+
+            testInstance.stop();
+
+            contender.throwErrorIfPresent();
+        }
+    }
+
+    @Test
+    void testStopWhileHavingLeadership() throws Exception {
+        final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory 
driverFactory =
+                new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+        try (final DefaultLeaderElectionService testInstance =
+                new DefaultLeaderElectionService(driverFactory)) {
+            testInstance.startLeaderElectionBackend();
+
+            final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
+            assertThat(driver).isNotNull();
+
+            driver.isLeader();
+
+            
testInstance.start(TestingGenericLeaderContender.newBuilder().build());
+
+            testInstance.stop();
+        }
+    }
+
+    @Test
+    void testContenderRegistrationWithoutDriverBeingInstantiatedFails() throws 
Exception {
+        try (final DefaultLeaderElectionService leaderElectionService =
+                new DefaultLeaderElectionService(
+                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory())) {
+            assertThatThrownBy(
+                            () ->
+                                    leaderElectionService.start(
+                                            new TestingContender(
+                                                    "unused-address", 
leaderElectionService)))
+                    .isInstanceOf(IllegalStateException.class);
+        }
+    }
+
+    @Test
+    void testDriverShutdownFailsWithContenderStillBeingRegistered() throws 
Exception {
+        new Context() {
+            {
+                runTestWithSynchronousEventHandling(
+                        () ->
+                                
assertThatThrownBy(leaderElectionService::close)
+                                        .as(
+                                                "The LeaderContender needs to 
be deregistered before closing the driver.")
+                                        
.isInstanceOf(IllegalStateException.class));
+            }
+        };
+    }
+
     @Test
     void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception {
         new Context() {
@@ -325,7 +490,7 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testOldConfirmLeaderInformation() throws Exception {
+    void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws 
Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
@@ -344,6 +509,29 @@ class DefaultLeaderElectionServiceTest {
         };
     }
 
+    @Test
+    void testOldConfirmationWhileHavingLeadershipLost() throws Exception {
+        new Context() {
+            {
+                runTestWithSynchronousEventHandling(
+                        () -> {
+                            testingLeaderElectionDriver.isLeader();
+                            final UUID currentLeaderSessionId =
+                                    leaderElectionService.getLeaderSessionID();
+                            assertThat(currentLeaderSessionId).isNotNull();
+
+                            testingLeaderElectionDriver.notLeader();
+
+                            // Old confirm call should be ignored.
+                            leaderElectionService.confirmLeadership(
+                                    currentLeaderSessionId, TEST_URL);
+
+                            
assertThat(leaderElectionService.getLeaderSessionID()).isNull();
+                        });
+            }
+        };
+    }
+
     @Test
     void testErrorForwarding() throws Exception {
         new Context() {
@@ -358,6 +546,8 @@ class DefaultLeaderElectionServiceTest {
                             assertThat(testingContender.getError())
                                     .isNotNull()
                                     .hasCause(testException);
+
+                            testingContender.clearError();
                         });
             }
         };
@@ -394,7 +584,9 @@ class DefaultLeaderElectionServiceTest {
         final TestingContender testingContender =
                 new TestingContender(TEST_URL, leaderElectionService);
 
+        leaderElectionService.startLeaderElectionBackend();
         leaderElectionService.start(testingContender);
+
         final TestingLeaderElectionDriver currentLeaderDriver =
                 Preconditions.checkNotNull(
                         
testingLeaderElectionDriverFactory.getCurrentLeaderDriver());
@@ -402,6 +594,8 @@ class DefaultLeaderElectionServiceTest {
         currentLeaderDriver.isLeader();
 
         leaderElectionService.stop();
+
+        testingContender.throwErrorIfPresent();
     }
 
     @Test
@@ -427,6 +621,7 @@ class DefaultLeaderElectionServiceTest {
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(
                         (leaderElectionEventHandler, errorHandler) -> driver);
+        testInstance.startLeaderElectionBackend();
 
         final String address = "leader-address";
         testInstance.start(
@@ -450,6 +645,7 @@ class DefaultLeaderElectionServiceTest {
         latch.trigger();
 
         testInstance.stop();
+        testInstance.close();
     }
 
     @Test
@@ -489,6 +685,7 @@ class DefaultLeaderElectionServiceTest {
 
         final DefaultLeaderElectionService testInstance =
                 new DefaultLeaderElectionService(driverFactory);
+        testInstance.startLeaderElectionBackend();
         testInstance.start(contender);
 
         final TestingLeaderElectionDriver driver = 
driverFactory.getCurrentLeaderDriver();
@@ -499,6 +696,7 @@ class DefaultLeaderElectionServiceTest {
         latch.trigger();
 
         testInstance.stop();
+        testInstance.close();
     }
 
     private static class Context {
@@ -523,19 +721,32 @@ class DefaultLeaderElectionServiceTest {
 
         void runTest(RunnableWithException testMethod, ExecutorService 
leaderEventOperationExecutor)
                 throws Exception {
-            final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
-                    new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
-            leaderElectionService =
-                    new DefaultLeaderElectionService(driverFactory, 
leaderEventOperationExecutor);
-            testingContender = new TestingContender(TEST_URL, 
leaderElectionService);
-
-            leaderElectionService.start(testingContender);
-            testingLeaderElectionDriver = 
driverFactory.getCurrentLeaderDriver();
-
-            assertThat(testingLeaderElectionDriver).isNotNull();
-            testMethod.run();
-
-            leaderElectionService.stop();
+            try {
+                final 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory driverFactory =
+                        new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+                leaderElectionService =
+                        new DefaultLeaderElectionService(
+                                driverFactory, leaderEventOperationExecutor);
+                leaderElectionService.startLeaderElectionBackend();
+                testingContender = new TestingContender(TEST_URL, 
leaderElectionService);
+
+                leaderElectionService.start(testingContender);
+                testingLeaderElectionDriver = 
driverFactory.getCurrentLeaderDriver();
+
+                assertThat(testingLeaderElectionDriver).isNotNull();
+                testMethod.run();
+
+                leaderElectionService.stop();
+                leaderElectionService.close();
+            } finally {
+                if (testingContender != null) {
+                    testingContender.throwErrorIfPresent();
+                }
+
+                if (testingLeaderElectionDriver != null) {
+                    testingLeaderElectionDriver.close();
+                }
+            }
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index c5a09be8bc1..31fd0f6f7da 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -192,7 +192,7 @@ public class LeaderElectionTest {
         }
 
         @Override
-        public LeaderElectionService createLeaderElectionService() {
+        public LeaderElectionService createLeaderElectionService() throws 
Exception {
             return ZooKeeperUtils.createLeaderElectionService(
                     curatorFrameworkWrapper.asCuratorFramework());
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
index c385cb20c2a..35a7e82dcbd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java
@@ -80,7 +80,10 @@ public class TestingGenericLeaderContender implements 
LeaderContender {
     public static class Builder {
         private Consumer<UUID> grantLeadershipConsumer = ignoredSessionID -> 
{};
         private Runnable revokeLeadershipRunnable = () -> {};
-        private Consumer<Exception> handleErrorConsumer = ignoredError -> {};
+        private Consumer<Exception> handleErrorConsumer =
+                error -> {
+                    throw new AssertionError(error);
+                };
         private Supplier<String> getDescriptionSupplier = () -> "testing 
contender";
 
         private Builder() {}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java
index f11cd585722..c504e3eda2e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderBase.java
@@ -66,6 +66,10 @@ public class TestingLeaderBase {
         error = errorQueue.take();
     }
 
+    public void clearError() {
+        error = null;
+    }
+
     public void handleError(Throwable ex) {
         errorQueue.offer(ex);
     }
@@ -80,6 +84,24 @@ public class TestingLeaderBase {
         return error == null ? errorQueue.poll() : error;
     }
 
+    /**
+     * Method for exposing errors that were caught during the test execution 
and need to be exposed
+     * within the test.
+     *
+     * @throws AssertionError with the actual unhandled error as the cause if 
such an error was
+     *     caught during the test code execution.
+     */
+    public void throwErrorIfPresent() {
+        final String assertionErrorMessage = "An unhandled error was caught 
during test execution.";
+        if (error != null) {
+            throw new AssertionError(assertionErrorMessage, error);
+        }
+
+        if (!errorQueue.isEmpty()) {
+            throw new AssertionError(assertionErrorMessage, errorQueue.poll());
+        }
+    }
+
     public boolean isLeader() {
         return isLeader;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
index 9f344b30aa3..f3b4b9fd2c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.util.function.ThrowingConsumer;
 
 import javax.annotation.Nullable;
 
@@ -38,14 +39,18 @@ public class TestingLeaderElectionDriver implements 
LeaderElectionDriver {
     private final LeaderElectionEventHandler leaderElectionEventHandler;
     private final FatalErrorHandler fatalErrorHandler;
 
+    private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
+
     // Leader information on external storage
     private LeaderInformation leaderInformation = LeaderInformation.empty();
 
     private TestingLeaderElectionDriver(
             LeaderElectionEventHandler leaderElectionEventHandler,
-            FatalErrorHandler fatalErrorHandler) {
+            FatalErrorHandler fatalErrorHandler,
+            ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable) {
         this.leaderElectionEventHandler = leaderElectionEventHandler;
         this.fatalErrorHandler = fatalErrorHandler;
+        this.closeRunnable = closeRunnable;
     }
 
     @Override
@@ -61,7 +66,7 @@ public class TestingLeaderElectionDriver implements 
LeaderElectionDriver {
     @Override
     public void close() throws Exception {
         synchronized (lock) {
-            // noop
+            closeRunnable.accept(leaderElectionEventHandler);
         }
     }
 
@@ -101,12 +106,24 @@ public class TestingLeaderElectionDriver implements 
LeaderElectionDriver {
 
         private TestingLeaderElectionDriver currentLeaderDriver;
 
+        private final ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable;
+
+        public TestingLeaderElectionDriverFactory() {
+            this(ignoredLeaderElectionEventHandler -> {});
+        }
+
+        public TestingLeaderElectionDriverFactory(
+                ThrowingConsumer<LeaderElectionEventHandler, Exception> 
closeRunnable) {
+            this.closeRunnable = closeRunnable;
+        }
+
         @Override
         public LeaderElectionDriver createLeaderElectionDriver(
                 LeaderElectionEventHandler leaderEventHandler,
                 FatalErrorHandler fatalErrorHandler) {
             currentLeaderDriver =
-                    new TestingLeaderElectionDriver(leaderEventHandler, 
fatalErrorHandler);
+                    new TestingLeaderElectionDriver(
+                            leaderEventHandler, fatalErrorHandler, 
closeRunnable);
             return currentLeaderDriver;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 67a6387c25b..99ba84cf29a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -139,6 +139,7 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
                 new ZooKeeperLeaderElectionDriverFactory(client, PATH);
         DefaultLeaderElectionService leaderElectionService =
                 new DefaultLeaderElectionService(leaderElectionDriverFactory);
+        leaderElectionService.startLeaderElectionBackend();
 
         try {
             final TestingConnectionStateListener connectionStateListener =
@@ -165,6 +166,7 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
             validationLogic.accept(connectionStateListener, contender);
         } finally {
             leaderElectionService.stop();
+            leaderElectionService.close();
             curatorFrameworkWrapper.close();
 
             if (problem == Problem.LOST_CONNECTION) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 53fc1a1feab..7b9a67797d3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -220,6 +220,7 @@ class ZooKeeperLeaderElectionTest {
                                 "Stop leader election service of contender 
#{}.",
                                 numberSeenLeaders);
                         leaderElectionService[index].stop();
+                        leaderElectionService[index].close();
                         leaderElectionService[index] = null;
 
                         numberSeenLeaders++;
@@ -241,6 +242,7 @@ class ZooKeeperLeaderElectionTest {
             for (DefaultLeaderElectionService electionService : 
leaderElectionService) {
                 if (electionService != null) {
                     electionService.stop();
+                    electionService.close();
                 }
             }
         }
@@ -302,6 +304,8 @@ class ZooKeeperLeaderElectionTest {
 
                     // stop leader election service = revoke leadership
                     leaderElectionService[index].stop();
+                    leaderElectionService[index].close();
+
                     // create new leader election service which takes part in 
the leader election
                     leaderElectionService[index] =
                             
ZooKeeperUtils.createLeaderElectionService(createZooKeeperClient());
@@ -324,6 +328,7 @@ class ZooKeeperLeaderElectionTest {
             for (DefaultLeaderElectionService electionService : 
leaderElectionService) {
                 if (electionService != null) {
                     electionService.stop();
+                    electionService.close();
                 }
             }
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 533081cfca4..c2f3168efc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -26,7 +26,10 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import org.apache.flink.runtime.leaderelection.TestingContender;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener;
+import 
org.apache.flink.runtime.leaderelection.ZooKeeperMultipleComponentLeaderElectionDriver;
 import org.apache.flink.runtime.rpc.AddressResolution;
 import org.apache.flink.runtime.rpc.RpcSystem;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -49,6 +52,7 @@ import java.net.Socket;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.time.Duration;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -116,7 +120,6 @@ class ZooKeeperLeaderRetrievalTest {
         long sleepingTime = 1000;
 
         LeaderElectionService leaderElectionService = null;
-        LeaderElectionService faultyLeaderElectionService;
 
         Thread thread;
 
@@ -150,13 +153,21 @@ class ZooKeeperLeaderRetrievalTest {
                                 AddressResolution.NO_ADDRESS_RESOLUTION,
                                 config);
 
-                faultyLeaderElectionService =
-                        
highAvailabilityServices.getJobManagerLeaderElectionService(
-                                HighAvailabilityServices.DEFAULT_JOB_ID);
-                TestingContender wrongLeaderAddressContender =
-                        new TestingContender(wrongAddress, 
faultyLeaderElectionService);
-
-                faultyLeaderElectionService.start(wrongLeaderAddressContender);
+                // create driver to simulate a separate Flink process having 
leadership that writes
+                // its leader information to the ZooKeeper backend and gets 
lost afterward
+                final ZooKeeperMultipleComponentLeaderElectionDriver 
externalProcessDriver =
+                        new ZooKeeperMultipleComponentLeaderElectionDriver(
+                                ZooKeeperUtils.useNamespaceAndEnsurePath(
+                                        zooKeeperExtension.getZooKeeperClient(
+                                                
testingFatalErrorHandlerResource
+                                                        
.getTestingFatalErrorHandler()),
+                                        
ZooKeeperUtils.generateLeaderLatchPath("")),
+                                new TestingLeaderElectionListener());
+                externalProcessDriver.isLeader();
+
+                externalProcessDriver.publishLeaderInformation(
+                        HighAvailabilityServices.DEFAULT_JOB_ID.toString(),
+                        LeaderInformation.known(UUID.randomUUID(), 
wrongAddress));
 
                 FindConnectingAddress findConnectingAddress =
                         new FindConnectingAddress(
@@ -177,7 +188,8 @@ class ZooKeeperLeaderRetrievalTest {
 
                 Thread.sleep(sleepingTime);
 
-                faultyLeaderElectionService.stop();
+                externalProcessDriver.notLeader();
+                externalProcessDriver.close();
 
                 leaderElectionService.start(correctLeaderAddressContender);
 

Reply via email to