This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b8bba6117b [FLINK-31785][runtime] Moves LeaderElectionService.stop 
into LeaderElectionService.LeaderElection.close
0b8bba6117b is described below

commit 0b8bba6117bf2e8fe767b03d95274bac0b33faa0
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Jan 25 15:38:50 2023 +0100

    [FLINK-31785][runtime] Moves LeaderElectionService.stop into 
LeaderElectionService.LeaderElection.close
    
    Signed-off-by: Matthias Pohl <[email protected]>
---
 .../dispatcher/runner/DefaultDispatcherRunner.java |   5 +-
 .../nonha/embedded/EmbeddedLeaderService.java      |   3 +-
 .../JobMasterServiceLeadershipRunner.java          |   2 +-
 .../AbstractLeaderElectionService.java             |   3 +
 .../leaderelection/DefaultLeaderElection.java      |   8 ++
 .../DefaultLeaderElectionService.java              |   3 +-
 .../runtime/leaderelection/LeaderElection.java     |   9 +-
 .../leaderelection/LeaderElectionService.java      |   8 --
 .../StandaloneLeaderElectionService.java           |  14 ++-
 .../ResourceManagerServiceImpl.java                |   4 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java     |   4 +-
 .../runner/DefaultDispatcherRunnerTest.java        |  13 +-
 .../nonha/embedded/EmbeddedLeaderServiceTest.java  |   4 +-
 .../DefaultLeaderElectionServiceTest.java          |  86 ++++++++------
 .../leaderelection/DefaultLeaderElectionTest.java  | 132 ++++++++++++++-------
 .../runtime/leaderelection/LeaderElectionTest.java |   6 +-
 .../StandaloneLeaderElectionTest.java              |   4 +-
 .../leaderelection/TestingLeaderElection.java      |   3 +-
 .../TestingLeaderElectionService.java              |   5 -
 ...KeeperLeaderElectionConnectionHandlingTest.java |  13 +-
 .../ZooKeeperLeaderElectionTest.java               |  29 +++--
 .../ZooKeeperLeaderRetrievalTest.java              |  20 ++--
 .../ResourceManagerServiceImplTest.java            |   5 +-
 .../util/DocumentingDispatcherRestEndpoint.java    |   6 +-
 24 files changed, 239 insertions(+), 150 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
index d70ea9d9212..1998b7cafdd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
@@ -203,12 +203,11 @@ public final class DefaultDispatcherRunner implements 
DispatcherRunner, LeaderCo
 
     private CompletableFuture<Void> stopLeaderElectionService() {
         try {
-            leaderElectionService.stop();
+            leaderElection.close();
+            return FutureUtils.completedVoidFuture();
         } catch (Exception e) {
             return FutureUtils.completedExceptionally(e);
         }
-
-        return FutureUtils.completedVoidFuture();
     }
 
     private void runActionIfRunning(Runnable runnable) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index fe7ede69e4c..080510917a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -453,7 +453,8 @@ public class EmbeddedLeaderService {
         }
 
         @Override
-        public void stop() throws Exception {
+        public void remove(LeaderContender contender) {
+            Preconditions.checkArgument(contender == this.contender);
             removeContender(this);
         }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
index 58576c903ef..27f68b9a1c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
@@ -153,7 +153,7 @@ public class JobMasterServiceLeadershipRunner implements 
JobManagerRunner, Leade
                         processTerminationFuture,
                         () -> {
                             classLoaderLease.release();
-                            leaderElectionService.stop();
+                            leaderElection.close();
                         });
 
         FutureUtils.forward(serviceTerminationFuture, terminationFuture);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
index 02ddfa11f7d..0abad419329 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
@@ -38,6 +38,9 @@ public abstract class AbstractLeaderElectionService 
implements LeaderElectionSer
      */
     protected abstract void register(LeaderContender contender) throws 
Exception;
 
+    /** Removes the passed {@code LeaderContender} from the {@code 
LeaderElectionService}. */
+    protected abstract void remove(LeaderContender contender);
+
     /** Confirms the leadership with the given session ID and address. */
     protected abstract void confirmLeadership(UUID leaderSessionID, String 
leaderAddress);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
index a9a64a7e5ca..e5cc3dbe550 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
@@ -55,4 +55,12 @@ class DefaultLeaderElection implements LeaderElection {
     public boolean hasLeadership(UUID leaderSessionId) {
         return parentService.hasLeadership(leaderSessionId);
     }
+
+    @Override
+    public void close() throws Exception {
+        if (leaderContender != null) {
+            parentService.remove(leaderContender);
+            leaderContender = null;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 085565c9be3..5c92725fc48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -171,7 +171,8 @@ public class DefaultLeaderElectionService extends 
AbstractLeaderElectionService
     }
 
     @Override
-    public final void stop() throws Exception {
+    protected final void remove(LeaderContender contender) {
+        Preconditions.checkArgument(contender == this.leaderContender);
         LOG.info("Stopping DefaultLeaderElectionService.");
 
         synchronized (lock) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
index 83ab1404a97..c78621d6865 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
@@ -24,7 +24,7 @@ import java.util.UUID;
  * {@code LeaderElection} serves as a proxy between {@code 
LeaderElectionService} and {@link
  * LeaderContender}.
  */
-public interface LeaderElection {
+public interface LeaderElection extends AutoCloseable {
 
     /** Registers the passed {@link LeaderContender} with the leader election 
process. */
     void startLeaderElection(LeaderContender contender) throws Exception;
@@ -50,4 +50,11 @@ public interface LeaderElection {
      * @return true if the associated {@link LeaderContender} is the leader, 
otherwise false
      */
     boolean hasLeadership(UUID leaderSessionId);
+
+    /**
+     * Closes the {@code LeaderElection} by deregistering the {@link 
LeaderContender} from the
+     * underlying leader election. {@link LeaderContender#revokeLeadership()} 
will be called if the
+     * service still holds the leadership.
+     */
+    void close() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index 284049f8439..c83625a7db0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -40,12 +40,4 @@ public interface LeaderElectionService {
      * LeaderElectionService} instance.
      */
     LeaderElection createLeaderElection();
-
-    /**
-     * Stops the leader election service. Stopping the {@code 
LeaderElectionService} will trigger
-     * {@link LeaderContender#revokeLeadership()} if the service still holds 
the leadership.
-     *
-     * @throws Exception if an error occurs while stopping the {@code 
LeaderElectionService}.
-     */
-    void stop() throws Exception;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index 82453d9d62f..d1ecfd7f9b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -48,10 +48,12 @@ public class StandaloneLeaderElectionService extends 
AbstractLeaderElectionServi
     }
 
     @Override
-    public void stop() {
-        if (contender != null) {
-            contender.revokeLeadership();
-            contender = null;
+    protected void remove(LeaderContender contender) {
+        Preconditions.checkArgument(contender == this.contender);
+
+        if (this.contender != null) {
+            this.contender.revokeLeadership();
+            this.contender = null;
         }
     }
 
@@ -60,7 +62,7 @@ public class StandaloneLeaderElectionService extends 
AbstractLeaderElectionServi
 
     @Override
     protected boolean hasLeadership(UUID leaderSessionId) {
-        return (contender != null
-                && 
HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId));
+        return contender != null
+                && 
HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index c64eda4823d..0314cc68302 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -329,7 +329,9 @@ public class ResourceManagerServiceImpl implements 
ResourceManagerService, Leade
 
     private void stopLeaderElectionService() {
         try {
-            leaderElectionService.stop();
+            if (leaderElection != null) {
+                leaderElection.close();
+            }
         } catch (Exception e) {
             serviceTerminationFuture.completeExceptionally(
                     new FlinkException("Cannot stop leader election service.", 
e));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 2f8a4c800c6..d866fd9435c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -1093,7 +1093,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                     }
 
                     try {
-                        leaderElectionService.stop();
+                        if (leaderElection != null) {
+                            leaderElection.close();
+                        }
                     } catch (Exception e) {
                         exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
index ca61c858683..87f982c52bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher.runner;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -44,13 +45,15 @@ import static org.junit.Assert.fail;
 /** Tests for the {@link DefaultDispatcherRunner}. */
 public class DefaultDispatcherRunnerTest extends TestLogger {
 
-    private TestingLeaderElectionService testingLeaderElectionService;
+    private final TestingLeaderElectionService testingLeaderElectionService =
+            new TestingLeaderElectionService();
+    private LeaderElection leaderElection;
     private TestingFatalErrorHandler testingFatalErrorHandler;
     private TestingDispatcherLeaderProcessFactory 
testingDispatcherLeaderProcessFactory;
 
     @Before
     public void setup() {
-        testingLeaderElectionService = new TestingLeaderElectionService();
+        leaderElection = testingLeaderElectionService.createLeaderElection();
         testingFatalErrorHandler = new TestingFatalErrorHandler();
         testingDispatcherLeaderProcessFactory =
                 TestingDispatcherLeaderProcessFactory.defaultValue();
@@ -58,9 +61,9 @@ public class DefaultDispatcherRunnerTest extends TestLogger {
 
     @After
     public void teardown() throws Exception {
-        if (testingLeaderElectionService != null) {
-            testingLeaderElectionService.stop();
-            testingLeaderElectionService = null;
+        if (leaderElection != null) {
+            leaderElection.close();
+            leaderElection = null;
         }
 
         if (testingFatalErrorHandler != null) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
index 01b54141aa8..5409812f3a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
@@ -58,7 +58,7 @@ public class EmbeddedLeaderServiceTest extends TestLogger {
 
             final LeaderElection leaderElection = 
leaderElectionService.createLeaderElection();
             leaderElection.startLeaderElection(contender);
-            leaderElectionService.stop();
+            leaderElection.close();
 
             try {
                 // check that no exception occurred
@@ -97,7 +97,7 @@ public class EmbeddedLeaderServiceTest extends TestLogger {
 
             final CompletableFuture<Void> revokeLeadershipFuture =
                     embeddedLeaderService.revokeLeadership();
-            leaderElectionService.stop();
+            leaderElection.close();
 
             try {
                 // check that no exception occurred
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index 45c19379dad..6fddfb70fb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -105,7 +105,7 @@ class DefaultLeaderElectionServiceTest {
 
             
assertThat(testingContender.getLeaderSessionID()).isEqualTo(expectedLeaderSessionID);
 
-            testInstance.stop();
+            leaderElection.close();
         }
     }
 
@@ -117,24 +117,28 @@ class DefaultLeaderElectionServiceTest {
                         executorService -> {
                             // we need to stop to deregister the contender 
that was already
                             // registered to the service
-                            leaderElectionService.stop();
+                            leaderElection.close();
 
                             final UUID expectedSessionID = UUID.randomUUID();
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
 
-                            leaderElection = 
testingContender.startLeaderElection();
+                            try (LeaderElection anotherLeaderElection =
+                                    testingContender.startLeaderElection()) {
 
-                            assertThat(testingContender.getLeaderSessionID())
-                                    .as("Leadership grant was not forwarded to 
the contender, yet.")
-                                    .isNull();
+                                
assertThat(testingContender.getLeaderSessionID())
+                                        .as(
+                                                "Leadership grant was not 
forwarded to the contender, yet.")
+                                        .isNull();
 
-                            executorService.trigger();
+                                executorService.trigger();
 
-                            assertThat(testingContender.getLeaderSessionID())
-                                    .as("Leadership grant is actually 
forwarded to the service.")
-                                    .isEqualTo(expectedSessionID);
+                                
assertThat(testingContender.getLeaderSessionID())
+                                        .as(
+                                                "Leadership grant is actually 
forwarded to the service.")
+                                        .isEqualTo(expectedSessionID);
 
-                            testingContender.waitForLeader();
+                                testingContender.waitForLeader();
+                            }
                         });
             }
         };
@@ -146,9 +150,9 @@ class DefaultLeaderElectionServiceTest {
             {
                 runTestWithManuallyTriggeredEvents(
                         executorService -> {
-                            // we need to stop to deregister the contender 
that was already
-                            // registered to the service
-                            leaderElectionService.stop();
+                            // we need to close the LeaderElection to 
deregister the contender that
+                            // was already registered to the service
+                            leaderElection.close();
 
                             final UUID expectedSessionID = UUID.randomUUID();
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
@@ -156,7 +160,7 @@ class DefaultLeaderElectionServiceTest {
 
                             leaderElection = 
testingContender.startLeaderElection();
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
 
                             executorService.trigger();
                         });
@@ -188,7 +192,7 @@ class DefaultLeaderElectionServiceTest {
 
             contender.waitForLeader();
 
-            testInstance.stop();
+            leaderElection.close();
 
             contender.throwErrorIfPresent();
         }
@@ -211,7 +215,7 @@ class DefaultLeaderElectionServiceTest {
             final LeaderElection leaderElection = 
testInstance.createLeaderElection();
             
leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
 
-            testInstance.stop();
+            leaderElection.close();
         }
     }
 
@@ -245,7 +249,7 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception {
+    void testProperCleanupOnLeaderElectionCloseWhenHoldingTheLeadership() 
throws Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
@@ -258,7 +262,7 @@ class DefaultLeaderElectionServiceTest {
                             
assertThat(testingLeaderElectionDriver.getLeaderInformation().isEmpty())
                                     .isFalse();
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
 
                             assertThat(testingContender.getLeaderSessionID())
                                     .as(
@@ -394,7 +398,7 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testHasLeadershipAfterStop() throws Exception {
+    void testHasLeadershipAfterLeaderElectionClose() throws Exception {
         new Context() {
             {
                 runTestWithManuallyTriggeredEvents(
@@ -403,7 +407,7 @@ class DefaultLeaderElectionServiceTest {
                             
testingLeaderElectionDriver.isLeader(expectedSessionID);
                             executorService.trigger();
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
 
                             
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
                                     .isFalse();
@@ -430,12 +434,12 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testOnGrantLeadershipIsIgnoredAfterBeingStop() throws Exception {
+    void testOnGrantLeadershipIsIgnoredAfterLeaderElectionBeingStop() throws 
Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
-                            leaderElectionService.stop();
+                            leaderElection.close();
                             testingLeaderElectionDriver.isLeader();
 
                             
assertThat(leaderElectionService.getLeaderSessionID())
@@ -450,14 +454,14 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testOnLeaderInformationChangeIsIgnoredAfterBeingStop() throws 
Exception {
+    void testOnLeaderInformationChangeIsIgnoredAfterLeaderElectionBeingStop() 
throws Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
                             testingLeaderElectionDriver.isLeader();
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
                             
testingLeaderElectionDriver.leaderInformationChanged(
                                     LeaderInformation.empty());
 
@@ -470,7 +474,7 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testOnRevokeLeadershipIsTriggeredAfterBeingStop() throws Exception {
+    void testOnRevokeLeadershipIsTriggeredAfterLeaderElectionBeingStop() 
throws Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
@@ -480,7 +484,7 @@ class DefaultLeaderElectionServiceTest {
                             assertThat(testingContender.getLeaderSessionID())
                                     .isEqualTo(oldSessionId);
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
 
                             assertThat(testingContender.getLeaderSessionID())
                                     .as(
@@ -555,14 +559,15 @@ class DefaultLeaderElectionServiceTest {
     }
 
     @Test
-    void testErrorIsIgnoredAfterBeingStop() throws Exception {
+    void testErrorIsIgnoredAfterLeaderElectionBeingClosed() throws Exception {
         new Context() {
             {
                 runTestWithSynchronousEventHandling(
                         () -> {
                             final Exception testException = new 
Exception("test leader exception");
 
-                            leaderElectionService.stop();
+                            leaderElection.close();
+
                             
testingLeaderElectionDriver.onFatalError(testException);
                             assertThat(testingContender.getError()).isNull();
                         });
@@ -581,12 +586,11 @@ class DefaultLeaderElectionServiceTest {
                         new 
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
         final DefaultLeaderElectionService leaderElectionService =
                 new 
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+        leaderElectionService.startLeaderElectionBackend();
 
         final TestingContender testingContender =
                 new TestingContender(TEST_URL, leaderElectionService);
-
-        leaderElectionService.startLeaderElectionBackend();
-        testingContender.startLeaderElection();
+        final LeaderElection leaderElection = 
testingContender.startLeaderElection();
 
         final TestingLeaderElectionDriver currentLeaderDriver =
                 Preconditions.checkNotNull(
@@ -594,7 +598,8 @@ class DefaultLeaderElectionServiceTest {
 
         currentLeaderDriver.isLeader();
 
-        leaderElectionService.stop();
+        leaderElection.close();
+        leaderElectionService.close();
 
         testingContender.throwErrorIfPresent();
     }
@@ -646,7 +651,7 @@ class DefaultLeaderElectionServiceTest {
 
         latch.trigger();
 
-        testInstance.stop();
+        leaderElection.close();
         testInstance.close();
     }
 
@@ -698,7 +703,7 @@ class DefaultLeaderElectionServiceTest {
 
         latch.trigger();
 
-        testInstance.stop();
+        leaderElection.close();
         testInstance.close();
     }
 
@@ -740,10 +745,15 @@ class DefaultLeaderElectionServiceTest {
 
                 assertThat(testingLeaderElectionDriver).isNotNull();
                 testMethod.run();
-
-                leaderElectionService.stop();
-                leaderElectionService.close();
             } finally {
+                if (leaderElection != null) {
+                    leaderElection.close();
+                }
+
+                if (leaderElectionService != null) {
+                    leaderElectionService.close();
+                }
+
                 if (testingContender != null) {
                     testingContender.throwErrorIfPresent();
                 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
index f15bdd069af..3e1de05856a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
@@ -23,8 +23,10 @@ import org.apache.flink.util.function.ThrowingConsumer;
 import org.junit.jupiter.api.Test;
 
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -39,27 +41,27 @@ class DefaultLeaderElectionTest {
                 TestingAbstractLeaderElectionService.newBuilder()
                         .setRegisterConsumer(contenderRef::set)
                         .build();
-        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
+        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
 
-        final LeaderContender contender = 
TestingGenericLeaderContender.newBuilder().build();
-        testInstance.startLeaderElection(contender);
+            final LeaderContender contender = 
TestingGenericLeaderContender.newBuilder().build();
+            testInstance.startLeaderElection(contender);
 
-        assertThat(contenderRef.get()).isSameAs(contender);
+            assertThat(contenderRef.get()).isSameAs(contender);
+        }
     }
 
     @Test
-    void testContenderRegistrationNull() {
-        assertThatThrownBy(
-                        () ->
-                                new DefaultLeaderElection(
-                                                
TestingAbstractLeaderElectionService.newBuilder()
-                                                        .build())
-                                        .startLeaderElection(null))
-                .isInstanceOf(NullPointerException.class);
+    void testContenderRegistrationNull() throws Exception {
+        try (final DefaultLeaderElection testInstance =
+                new DefaultLeaderElection(
+                        
TestingAbstractLeaderElectionService.newBuilder().build())) {
+            assertThatThrownBy(() -> testInstance.startLeaderElection(null))
+                    .isInstanceOf(NullPointerException.class);
+        }
     }
 
     @Test
-    void testContenderRegistrationFailure() {
+    void testContenderRegistrationFailure() throws Exception {
         final Exception expectedException =
                 new Exception("Expected exception during contender 
registration.");
         final AbstractLeaderElectionService parentService =
@@ -69,17 +71,17 @@ class DefaultLeaderElectionTest {
                                     throw expectedException;
                                 })
                         .build();
-        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
-
-        assertThatThrownBy(
-                        () ->
-                                testInstance.startLeaderElection(
-                                        
TestingGenericLeaderContender.newBuilder().build()))
-                .isEqualTo(expectedException);
+        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+            assertThatThrownBy(
+                            () ->
+                                    testInstance.startLeaderElection(
+                                            
TestingGenericLeaderContender.newBuilder().build()))
+                    .isEqualTo(expectedException);
+        }
     }
 
     @Test
-    void testLeaderConfirmation() {
+    void testLeaderConfirmation() throws Exception {
         final AtomicReference<UUID> leaderSessionIDRef = new 
AtomicReference<>();
         final AtomicReference<String> leaderAddressRef = new 
AtomicReference<>();
         final AbstractLeaderElectionService parentService =
@@ -90,27 +92,63 @@ class DefaultLeaderElectionTest {
                                     leaderAddressRef.set(address);
                                 })
                         .build();
+        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
+
+            final UUID expectedLeaderSessionID = UUID.randomUUID();
+            final String expectedAddress = "random-address";
+            testInstance.confirmLeadership(expectedLeaderSessionID, 
expectedAddress);
+
+            
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+            assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress);
+        }
+    }
+
+    @Test
+    void testClose() throws Exception {
+        final CompletableFuture<LeaderContender> actualContender = new 
CompletableFuture<>();
+        final AbstractLeaderElectionService parentService =
+                TestingAbstractLeaderElectionService.newBuilder()
+                        .setRegisterConsumer(ignoredContender -> {})
+                        .setRemoveConsumer(actualContender::complete)
+                        .build();
+
         final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
 
-        final UUID expectedLeaderSessionID = UUID.randomUUID();
-        final String expectedAddress = "random-address";
-        testInstance.confirmLeadership(expectedLeaderSessionID, 
expectedAddress);
+        final LeaderContender contender = 
TestingGenericLeaderContender.newBuilder().build();
+        testInstance.startLeaderElection(contender);
+        testInstance.close();
+
+        assertThat(actualContender).isCompletedWithValue(contender);
+    }
+
+    @Test
+    void testCloseWithoutStart() throws Exception {
+        final CompletableFuture<LeaderContender> actualContender = new 
CompletableFuture<>();
+        final AbstractLeaderElectionService parentService =
+                TestingAbstractLeaderElectionService.newBuilder()
+                        .setRemoveConsumer(actualContender::complete)
+                        .build();
+
+        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
+        testInstance.close();
 
-        
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
-        assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress);
+        assertThat(actualContender)
+                .as(
+                        "No removal should be triggered if there's no 
contender that need to be deregistered.")
+                .isNotDone();
     }
 
     @Test
-    void testHasLeadershipTrue() {
+    void testHasLeadershipTrue() throws Exception {
         testHasLeadership(true);
     }
 
     @Test
-    void testHasLeadershipFalse() {
+    void testHasLeadershipFalse() throws Exception {
         testHasLeadership(false);
     }
 
-    private void testHasLeadership(boolean expectedReturnValue) {
+    private void testHasLeadership(boolean expectedReturnValue) throws 
Exception {
         final AtomicReference<UUID> leaderSessionIDRef = new 
AtomicReference<>();
         final AbstractLeaderElectionService parentService =
                 TestingAbstractLeaderElectionService.newBuilder()
@@ -120,28 +158,32 @@ class DefaultLeaderElectionTest {
                                     return expectedReturnValue;
                                 })
                         .build();
-        final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService);
+        try (final DefaultLeaderElection testInstance = new 
DefaultLeaderElection(parentService)) {
 
-        final UUID expectedLeaderSessionID = UUID.randomUUID();
-        assertThat(testInstance.hasLeadership(expectedLeaderSessionID))
-                .isEqualTo(expectedReturnValue);
-        
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+            final UUID expectedLeaderSessionID = UUID.randomUUID();
+            assertThat(testInstance.hasLeadership(expectedLeaderSessionID))
+                    .isEqualTo(expectedReturnValue);
+            
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+        }
     }
 
     private static class TestingAbstractLeaderElectionService
             extends AbstractLeaderElectionService {
 
         private final ThrowingConsumer<LeaderContender, Exception> 
registerConsumer;
+        private final Consumer<LeaderContender> removeConsumer;
         private final BiConsumer<UUID, String> confirmLeadershipConsumer;
         private final Function<UUID, Boolean> hasLeadershipFunction;
 
         private TestingAbstractLeaderElectionService(
                 ThrowingConsumer<LeaderContender, Exception> registerConsumer,
+                Consumer<LeaderContender> removeConsumer,
                 BiConsumer<UUID, String> confirmLeadershipConsumer,
                 Function<UUID, Boolean> hasLeadershipFunction) {
             super();
 
             this.registerConsumer = registerConsumer;
+            this.removeConsumer = removeConsumer;
             this.confirmLeadershipConsumer = confirmLeadershipConsumer;
             this.hasLeadershipFunction = hasLeadershipFunction;
         }
@@ -151,6 +193,11 @@ class DefaultLeaderElectionTest {
             registerConsumer.accept(contender);
         }
 
+        @Override
+        protected void remove(LeaderContender contender) {
+            removeConsumer.accept(contender);
+        }
+
         @Override
         protected void confirmLeadership(UUID leaderSessionID, String 
leaderAddress) {
             confirmLeadershipConsumer.accept(leaderSessionID, leaderAddress);
@@ -161,17 +208,13 @@ class DefaultLeaderElectionTest {
             return hasLeadershipFunction.apply(leaderSessionId);
         }
 
-        @Override
-        public void stop() {
-            throw new UnsupportedOperationException("stop is not supported.");
-        }
-
         public static Builder newBuilder() {
             return new Builder()
                     .setRegisterConsumer(
                             contender -> {
                                 throw new 
UnsupportedOperationException("register not supported");
                             })
+                    .setRemoveConsumer(contender -> {})
                     .setConfirmLeadershipConsumer(
                             (leaderSessionID, address) -> {
                                 throw new UnsupportedOperationException(
@@ -188,6 +231,7 @@ class DefaultLeaderElectionTest {
 
             private ThrowingConsumer<LeaderContender, Exception> 
registerConsumer =
                     ignoredContender -> {};
+            private Consumer<LeaderContender> removeConsumer;
             private BiConsumer<UUID, String> confirmLeadershipConsumer =
                     (ignoredSessionID, ignoredAddress) -> {};
             private Function<UUID, Boolean> hasLeadershipFunction = 
ignoredSessiondID -> false;
@@ -200,6 +244,11 @@ class DefaultLeaderElectionTest {
                 return this;
             }
 
+            public Builder setRemoveConsumer(Consumer<LeaderContender> 
removeConsumer) {
+                this.removeConsumer = removeConsumer;
+                return this;
+            }
+
             public Builder setConfirmLeadershipConsumer(
                     BiConsumer<UUID, String> confirmLeadershipConsumer) {
                 this.confirmLeadershipConsumer = confirmLeadershipConsumer;
@@ -213,7 +262,10 @@ class DefaultLeaderElectionTest {
 
             public TestingAbstractLeaderElectionService build() {
                 return new TestingAbstractLeaderElectionService(
-                        registerConsumer, confirmLeadershipConsumer, 
hasLeadershipFunction);
+                        registerConsumer,
+                        removeConsumer,
+                        confirmLeadershipConsumer,
+                        hasLeadershipFunction);
             }
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index f5fe44c7ae5..0c1f7e038fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -98,9 +98,13 @@ public class LeaderElectionTest {
 
             assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue();
 
-            leaderElectionService.stop();
+            leaderElection.close();
 
             
assertThat(leaderElection.hasLeadership(leaderSessionId)).isFalse();
+
+            assertThat(manualLeaderContender.waitForLeaderSessionId())
+                    .as("The leadership has been revoked from the contender.")
+                    .isEqualTo(ManualLeaderContender.NULL_LEADER_SESSION_ID);
         } finally {
             manualLeaderContender.rethrowError();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
index 1dc9aa80cef..e5b51688686 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
@@ -42,8 +42,7 @@ class StandaloneLeaderElectionTest {
         TestingContender contender = new TestingContender(TEST_URL, 
leaderElectionService);
         TestingListener testingListener = new TestingListener();
 
-        try {
-            contender.startLeaderElection();
+        try (LeaderElection leaderElection = contender.startLeaderElection()) {
             leaderRetrievalService.start(testingListener);
 
             contender.waitForLeader();
@@ -58,7 +57,6 @@ class StandaloneLeaderElectionTest {
             assertThat(testingListener.getLeaderSessionID())
                     .isEqualTo(HighAvailabilityServices.DEFAULT_LEADER_ID);
         } finally {
-            leaderElectionService.stop();
             leaderRetrievalService.stop();
         }
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
index 5885975ec50..1855c20e1a2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
@@ -80,7 +80,8 @@ public class TestingLeaderElection implements LeaderElection {
         return issuedLeaderSessionId != null;
     }
 
-    synchronized void triggerContenderCleanup() {
+    @Override
+    public synchronized void close() {
         if (hasLeadership() && this.contender != null) {
             this.contender.revokeLeadership();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 36f5462970a..d98612c7a9f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -36,11 +36,6 @@ public class TestingLeaderElectionService implements 
LeaderElectionService {
         return startedLeaderElection;
     }
 
-    @Override
-    public synchronized void stop() throws Exception {
-        startedLeaderElection.triggerContenderCleanup();
-    }
-
     public synchronized CompletableFuture<LeaderInformation> isLeader(UUID 
leaderSessionID) {
         return startedLeaderElection.isLeader(leaderSessionID);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 8344a468d71..24d0532768e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -141,13 +141,13 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
                 new DefaultLeaderElectionService(leaderElectionDriverFactory);
         leaderElectionService.startLeaderElectionBackend();
 
-        try {
-            final TestingConnectionStateListener connectionStateListener =
-                    new TestingConnectionStateListener();
-            
client.getConnectionStateListenable().addListener(connectionStateListener);
+        final TestingConnectionStateListener connectionStateListener =
+                new TestingConnectionStateListener();
+        
client.getConnectionStateListenable().addListener(connectionStateListener);
 
-            final TestingContender contender = new TestingContender();
-            
leaderElectionService.createLeaderElection().startLeaderElection(contender);
+        final TestingContender contender = new TestingContender();
+        try (LeaderElection leaderElection = 
leaderElectionService.createLeaderElection()) {
+            leaderElection.startLeaderElection(contender);
 
             contender.awaitGrantLeadership();
 
@@ -165,7 +165,6 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
 
             validationLogic.accept(connectionStateListener, contender);
         } finally {
-            leaderElectionService.stop();
             leaderElectionService.close();
             curatorFrameworkWrapper.close();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 7c72acdd231..e83702a960e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -172,6 +172,7 @@ class ZooKeeperLeaderElectionTest {
 
         DefaultLeaderElectionService[] leaderElectionService =
                 new DefaultLeaderElectionService[num];
+        LeaderElection[] leaderElections = new LeaderElection[num];
         TestingContender[] contenders = new TestingContender[num];
         DefaultLeaderRetrievalService leaderRetrievalService = null;
 
@@ -192,7 +193,7 @@ class ZooKeeperLeaderElectionTest {
 
                 LOG.debug("Start leader election service for contender #{}.", 
i);
 
-                contenders[i].startLeaderElection();
+                leaderElections[i] = contenders[i].startLeaderElection();
             }
 
             String pattern = LEADER_ADDRESS + "_" + "(\\d+)";
@@ -219,7 +220,8 @@ class ZooKeeperLeaderElectionTest {
                         LOG.debug(
                                 "Stop leader election service of contender 
#{}.",
                                 numberSeenLeaders);
-                        leaderElectionService[index].stop();
+                        leaderElections[index].close();
+                        leaderElections[index] = null;
                         leaderElectionService[index].close();
                         leaderElectionService[index] = null;
 
@@ -239,9 +241,14 @@ class ZooKeeperLeaderElectionTest {
                 leaderRetrievalService.stop();
             }
 
+            for (LeaderElection leaderElection : leaderElections) {
+                if (leaderElection != null) {
+                    leaderElection.close();
+                }
+            }
+
             for (DefaultLeaderElectionService electionService : 
leaderElectionService) {
                 if (electionService != null) {
-                    electionService.stop();
                     electionService.close();
                 }
             }
@@ -264,6 +271,7 @@ class ZooKeeperLeaderElectionTest {
 
         DefaultLeaderElectionService[] leaderElectionService =
                 new DefaultLeaderElectionService[num];
+        LeaderElection[] leaderElections = new LeaderElection[num];
         TestingContender[] contenders = new TestingContender[num];
         DefaultLeaderRetrievalService leaderRetrievalService = null;
 
@@ -282,7 +290,7 @@ class ZooKeeperLeaderElectionTest {
                         new TestingContender(
                                 LEADER_ADDRESS + "_" + i + "_0", 
leaderElectionService[i]);
 
-                contenders[i].startLeaderElection();
+                leaderElections[i] = contenders[i].startLeaderElection();
             }
 
             String pattern = LEADER_ADDRESS + "_" + "(\\d+)" + "_" + "(\\d+)";
@@ -303,8 +311,10 @@ class ZooKeeperLeaderElectionTest {
                             .isEqualTo(contenders[index].getLeaderSessionID());
 
                     // stop leader election service = revoke leadership
-                    leaderElectionService[index].stop();
+                    leaderElections[index].close();
+                    leaderElections[index] = null;
                     leaderElectionService[index].close();
+                    leaderElections[index] = null;
 
                     // create new leader election service which takes part in 
the leader election
                     leaderElectionService[index] =
@@ -314,7 +324,7 @@ class ZooKeeperLeaderElectionTest {
                                     LEADER_ADDRESS + "_" + index + "_" + 
(lastTry + 1),
                                     leaderElectionService[index]);
 
-                    contenders[index].startLeaderElection();
+                    leaderElections[index] = 
contenders[index].startLeaderElection();
                 } else {
                     throw new Exception("Did not find the leader's index.");
                 }
@@ -325,9 +335,14 @@ class ZooKeeperLeaderElectionTest {
                 leaderRetrievalService.stop();
             }
 
+            for (LeaderElection leaderElection : leaderElections) {
+                if (leaderElection != null) {
+                    leaderElection.close();
+                }
+            }
+
             for (DefaultLeaderElectionService electionService : 
leaderElectionService) {
                 if (electionService != null) {
-                    electionService.stop();
                     electionService.close();
                 }
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 697be6a1c41..8964411cb3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
 import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElection;
 import org.apache.flink.runtime.leaderelection.LeaderInformation;
 import org.apache.flink.runtime.leaderelection.TestingContender;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener;
@@ -119,7 +119,7 @@ class ZooKeeperLeaderRetrievalTest {
 
         long sleepingTime = 1000;
 
-        LeaderElectionService leaderElectionService = null;
+        LeaderElection leaderElection = null;
 
         Thread thread;
 
@@ -180,20 +180,18 @@ class ZooKeeperLeaderRetrievalTest {
 
                 thread.start();
 
-                leaderElectionService =
-                        
highAvailabilityServices.getJobManagerLeaderElectionService(
-                                HighAvailabilityServices.DEFAULT_JOB_ID);
                 TestingContender correctLeaderAddressContender =
-                        new TestingContender(correctAddress, 
leaderElectionService);
+                        new TestingContender(
+                                correctAddress,
+                                
highAvailabilityServices.getJobManagerLeaderElectionService(
+                                        
HighAvailabilityServices.DEFAULT_JOB_ID));
 
                 Thread.sleep(sleepingTime);
 
                 externalProcessDriver.notLeader();
                 externalProcessDriver.close();
 
-                // leaderElection is unused right now because it doesn't need 
to be closed, yet.
-                // The close call will be introduced with FLINK-31785
-                correctLeaderAddressContender.startLeaderElection();
+                leaderElection = 
correctLeaderAddressContender.startLeaderElection();
 
                 thread.join();
 
@@ -208,8 +206,8 @@ class ZooKeeperLeaderRetrievalTest {
                     socket.connect(correctInetSocketAddress, 1000);
                 }
             } finally {
-                if (leaderElectionService != null) {
-                    leaderElectionService.stop();
+                if (leaderElection != null) {
+                    leaderElection.close();
                 }
             }
         } catch (IOException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 0beb37c2ad8..c69ee382688 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -77,6 +77,7 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
 
     private TestingResourceManagerFactory.Builder rmFactoryBuilder;
     private TestingLeaderElectionService leaderElectionService;
+
     private ResourceManagerServiceImpl resourceManagerService;
 
     @BeforeClass
@@ -103,10 +104,6 @@ public class ResourceManagerServiceImplTest extends 
TestLogger {
             resourceManagerService.close();
         }
 
-        if (leaderElectionService != null) {
-            leaderElectionService.stop();
-        }
-
         if (fatalErrorHandler.hasExceptionOccurred()) {
             fatalErrorHandler.rethrowError();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index 81b7976616f..5f3b734ac5e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -94,9 +94,6 @@ public class DocumentingDispatcherRestEndpoint extends 
DispatcherRestEndpoint
         public LeaderElection createLeaderElection() {
             return NoOpLeaderElection.INSTANCE;
         }
-
-        @Override
-        public void stop() {}
     }
 
     private enum NoOpLeaderElection implements LeaderElection {
@@ -112,5 +109,8 @@ public class DocumentingDispatcherRestEndpoint extends 
DispatcherRestEndpoint
         public boolean hasLeadership(UUID leaderSessionId) {
             return false;
         }
+
+        @Override
+        public void close() {}
     }
 }

Reply via email to