This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0b8bba6117b [FLINK-31785][runtime] Moves LeaderElectionService.stop
into LeaderElectionService.LeaderElection.close
0b8bba6117b is described below
commit 0b8bba6117bf2e8fe767b03d95274bac0b33faa0
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Jan 25 15:38:50 2023 +0100
[FLINK-31785][runtime] Moves LeaderElectionService.stop into
LeaderElectionService.LeaderElection.close
Signed-off-by: Matthias Pohl <[email protected]>
---
.../dispatcher/runner/DefaultDispatcherRunner.java | 5 +-
.../nonha/embedded/EmbeddedLeaderService.java | 3 +-
.../JobMasterServiceLeadershipRunner.java | 2 +-
.../AbstractLeaderElectionService.java | 3 +
.../leaderelection/DefaultLeaderElection.java | 8 ++
.../DefaultLeaderElectionService.java | 3 +-
.../runtime/leaderelection/LeaderElection.java | 9 +-
.../leaderelection/LeaderElectionService.java | 8 --
.../StandaloneLeaderElectionService.java | 14 ++-
.../ResourceManagerServiceImpl.java | 4 +-
.../runtime/webmonitor/WebMonitorEndpoint.java | 4 +-
.../runner/DefaultDispatcherRunnerTest.java | 13 +-
.../nonha/embedded/EmbeddedLeaderServiceTest.java | 4 +-
.../DefaultLeaderElectionServiceTest.java | 86 ++++++++------
.../leaderelection/DefaultLeaderElectionTest.java | 132 ++++++++++++++-------
.../runtime/leaderelection/LeaderElectionTest.java | 6 +-
.../StandaloneLeaderElectionTest.java | 4 +-
.../leaderelection/TestingLeaderElection.java | 3 +-
.../TestingLeaderElectionService.java | 5 -
...KeeperLeaderElectionConnectionHandlingTest.java | 13 +-
.../ZooKeeperLeaderElectionTest.java | 29 +++--
.../ZooKeeperLeaderRetrievalTest.java | 20 ++--
.../ResourceManagerServiceImplTest.java | 5 +-
.../util/DocumentingDispatcherRestEndpoint.java | 6 +-
24 files changed, 239 insertions(+), 150 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
index d70ea9d9212..1998b7cafdd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java
@@ -203,12 +203,11 @@ public final class DefaultDispatcherRunner implements
DispatcherRunner, LeaderCo
private CompletableFuture<Void> stopLeaderElectionService() {
try {
- leaderElectionService.stop();
+ leaderElection.close();
+ return FutureUtils.completedVoidFuture();
} catch (Exception e) {
return FutureUtils.completedExceptionally(e);
}
-
- return FutureUtils.completedVoidFuture();
}
private void runActionIfRunning(Runnable runnable) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
index fe7ede69e4c..080510917a8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java
@@ -453,7 +453,8 @@ public class EmbeddedLeaderService {
}
@Override
- public void stop() throws Exception {
+ public void remove(LeaderContender contender) {
+ Preconditions.checkArgument(contender == this.contender);
removeContender(this);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
index 58576c903ef..27f68b9a1c1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java
@@ -153,7 +153,7 @@ public class JobMasterServiceLeadershipRunner implements
JobManagerRunner, Leade
processTerminationFuture,
() -> {
classLoaderLease.release();
- leaderElectionService.stop();
+ leaderElection.close();
});
FutureUtils.forward(serviceTerminationFuture, terminationFuture);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
index 02ddfa11f7d..0abad419329 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/AbstractLeaderElectionService.java
@@ -38,6 +38,9 @@ public abstract class AbstractLeaderElectionService
implements LeaderElectionSer
*/
protected abstract void register(LeaderContender contender) throws
Exception;
+ /** Removes the passed {@code LeaderContender} from the {@code
LeaderElectionService}. */
+ protected abstract void remove(LeaderContender contender);
+
/** Confirms the leadership with the given session ID and address. */
protected abstract void confirmLeadership(UUID leaderSessionID, String
leaderAddress);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
index a9a64a7e5ca..e5cc3dbe550 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java
@@ -55,4 +55,12 @@ class DefaultLeaderElection implements LeaderElection {
public boolean hasLeadership(UUID leaderSessionId) {
return parentService.hasLeadership(leaderSessionId);
}
+
+ @Override
+ public void close() throws Exception {
+ if (leaderContender != null) {
+ parentService.remove(leaderContender);
+ leaderContender = null;
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
index 085565c9be3..5c92725fc48 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java
@@ -171,7 +171,8 @@ public class DefaultLeaderElectionService extends
AbstractLeaderElectionService
}
@Override
- public final void stop() throws Exception {
+ protected final void remove(LeaderContender contender) {
+ Preconditions.checkArgument(contender == this.leaderContender);
LOG.info("Stopping DefaultLeaderElectionService.");
synchronized (lock) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
index 83ab1404a97..c78621d6865 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java
@@ -24,7 +24,7 @@ import java.util.UUID;
* {@code LeaderElection} serves as a proxy between {@code
LeaderElectionService} and {@link
* LeaderContender}.
*/
-public interface LeaderElection {
+public interface LeaderElection extends AutoCloseable {
/** Registers the passed {@link LeaderContender} with the leader election
process. */
void startLeaderElection(LeaderContender contender) throws Exception;
@@ -50,4 +50,11 @@ public interface LeaderElection {
* @return true if the associated {@link LeaderContender} is the leader,
otherwise false
*/
boolean hasLeadership(UUID leaderSessionId);
+
+ /**
+ * Closes the {@code LeaderElection} by deregistering the {@link
LeaderContender} from the
+ * underlying leader election. {@link LeaderContender#revokeLeadership()}
will be called if the
+ * service still holds the leadership.
+ */
+ void close() throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
index 284049f8439..c83625a7db0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java
@@ -40,12 +40,4 @@ public interface LeaderElectionService {
* LeaderElectionService} instance.
*/
LeaderElection createLeaderElection();
-
- /**
- * Stops the leader election service. Stopping the {@code
LeaderElectionService} will trigger
- * {@link LeaderContender#revokeLeadership()} if the service still holds
the leadership.
- *
- * @throws Exception if an error occurs while stopping the {@code
LeaderElectionService}.
- */
- void stop() throws Exception;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
index 82453d9d62f..d1ecfd7f9b2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionService.java
@@ -48,10 +48,12 @@ public class StandaloneLeaderElectionService extends
AbstractLeaderElectionServi
}
@Override
- public void stop() {
- if (contender != null) {
- contender.revokeLeadership();
- contender = null;
+ protected void remove(LeaderContender contender) {
+ Preconditions.checkArgument(contender == this.contender);
+
+ if (this.contender != null) {
+ this.contender.revokeLeadership();
+ this.contender = null;
}
}
@@ -60,7 +62,7 @@ public class StandaloneLeaderElectionService extends
AbstractLeaderElectionServi
@Override
protected boolean hasLeadership(UUID leaderSessionId) {
- return (contender != null
- &&
HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId));
+ return contender != null
+ &&
HighAvailabilityServices.DEFAULT_LEADER_ID.equals(leaderSessionId);
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
index c64eda4823d..0314cc68302 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java
@@ -329,7 +329,9 @@ public class ResourceManagerServiceImpl implements
ResourceManagerService, Leade
private void stopLeaderElectionService() {
try {
- leaderElectionService.stop();
+ if (leaderElection != null) {
+ leaderElection.close();
+ }
} catch (Exception e) {
serviceTerminationFuture.completeExceptionally(
new FlinkException("Cannot stop leader election service.",
e));
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 2f8a4c800c6..d866fd9435c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -1093,7 +1093,9 @@ public class WebMonitorEndpoint<T extends RestfulGateway>
extends RestServerEndp
}
try {
- leaderElectionService.stop();
+ if (leaderElection != null) {
+ leaderElection.close();
+ }
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e,
exception);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
index ca61c858683..87f982c52bb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.dispatcher.runner;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -44,13 +45,15 @@ import static org.junit.Assert.fail;
/** Tests for the {@link DefaultDispatcherRunner}. */
public class DefaultDispatcherRunnerTest extends TestLogger {
- private TestingLeaderElectionService testingLeaderElectionService;
+ private final TestingLeaderElectionService testingLeaderElectionService =
+ new TestingLeaderElectionService();
+ private LeaderElection leaderElection;
private TestingFatalErrorHandler testingFatalErrorHandler;
private TestingDispatcherLeaderProcessFactory
testingDispatcherLeaderProcessFactory;
@Before
public void setup() {
- testingLeaderElectionService = new TestingLeaderElectionService();
+ leaderElection = testingLeaderElectionService.createLeaderElection();
testingFatalErrorHandler = new TestingFatalErrorHandler();
testingDispatcherLeaderProcessFactory =
TestingDispatcherLeaderProcessFactory.defaultValue();
@@ -58,9 +61,9 @@ public class DefaultDispatcherRunnerTest extends TestLogger {
@After
public void teardown() throws Exception {
- if (testingLeaderElectionService != null) {
- testingLeaderElectionService.stop();
- testingLeaderElectionService = null;
+ if (leaderElection != null) {
+ leaderElection.close();
+ leaderElection = null;
}
if (testingFatalErrorHandler != null) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
index 01b54141aa8..5409812f3a7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderServiceTest.java
@@ -58,7 +58,7 @@ public class EmbeddedLeaderServiceTest extends TestLogger {
final LeaderElection leaderElection =
leaderElectionService.createLeaderElection();
leaderElection.startLeaderElection(contender);
- leaderElectionService.stop();
+ leaderElection.close();
try {
// check that no exception occurred
@@ -97,7 +97,7 @@ public class EmbeddedLeaderServiceTest extends TestLogger {
final CompletableFuture<Void> revokeLeadershipFuture =
embeddedLeaderService.revokeLeadership();
- leaderElectionService.stop();
+ leaderElection.close();
try {
// check that no exception occurred
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
index 45c19379dad..6fddfb70fb9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java
@@ -105,7 +105,7 @@ class DefaultLeaderElectionServiceTest {
assertThat(testingContender.getLeaderSessionID()).isEqualTo(expectedLeaderSessionID);
- testInstance.stop();
+ leaderElection.close();
}
}
@@ -117,24 +117,28 @@ class DefaultLeaderElectionServiceTest {
executorService -> {
// we need to stop to deregister the contender
that was already
// registered to the service
- leaderElectionService.stop();
+ leaderElection.close();
final UUID expectedSessionID = UUID.randomUUID();
testingLeaderElectionDriver.isLeader(expectedSessionID);
- leaderElection =
testingContender.startLeaderElection();
+ try (LeaderElection anotherLeaderElection =
+ testingContender.startLeaderElection()) {
- assertThat(testingContender.getLeaderSessionID())
- .as("Leadership grant was not forwarded to
the contender, yet.")
- .isNull();
+
assertThat(testingContender.getLeaderSessionID())
+ .as(
+ "Leadership grant was not
forwarded to the contender, yet.")
+ .isNull();
- executorService.trigger();
+ executorService.trigger();
- assertThat(testingContender.getLeaderSessionID())
- .as("Leadership grant is actually
forwarded to the service.")
- .isEqualTo(expectedSessionID);
+
assertThat(testingContender.getLeaderSessionID())
+ .as(
+ "Leadership grant is actually
forwarded to the service.")
+ .isEqualTo(expectedSessionID);
- testingContender.waitForLeader();
+ testingContender.waitForLeader();
+ }
});
}
};
@@ -146,9 +150,9 @@ class DefaultLeaderElectionServiceTest {
{
runTestWithManuallyTriggeredEvents(
executorService -> {
- // we need to stop to deregister the contender
that was already
- // registered to the service
- leaderElectionService.stop();
+ // we need to close the LeaderElection to
deregister the contender that
+ // was already registered to the service
+ leaderElection.close();
final UUID expectedSessionID = UUID.randomUUID();
testingLeaderElectionDriver.isLeader(expectedSessionID);
@@ -156,7 +160,7 @@ class DefaultLeaderElectionServiceTest {
leaderElection =
testingContender.startLeaderElection();
- leaderElectionService.stop();
+ leaderElection.close();
executorService.trigger();
});
@@ -188,7 +192,7 @@ class DefaultLeaderElectionServiceTest {
contender.waitForLeader();
- testInstance.stop();
+ leaderElection.close();
contender.throwErrorIfPresent();
}
@@ -211,7 +215,7 @@ class DefaultLeaderElectionServiceTest {
final LeaderElection leaderElection =
testInstance.createLeaderElection();
leaderElection.startLeaderElection(TestingGenericLeaderContender.newBuilder().build());
- testInstance.stop();
+ leaderElection.close();
}
}
@@ -245,7 +249,7 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testProperCleanupOnStopWhenHoldingTheLeadership() throws Exception {
+ void testProperCleanupOnLeaderElectionCloseWhenHoldingTheLeadership()
throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
@@ -258,7 +262,7 @@ class DefaultLeaderElectionServiceTest {
assertThat(testingLeaderElectionDriver.getLeaderInformation().isEmpty())
.isFalse();
- leaderElectionService.stop();
+ leaderElection.close();
assertThat(testingContender.getLeaderSessionID())
.as(
@@ -394,7 +398,7 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testHasLeadershipAfterStop() throws Exception {
+ void testHasLeadershipAfterLeaderElectionClose() throws Exception {
new Context() {
{
runTestWithManuallyTriggeredEvents(
@@ -403,7 +407,7 @@ class DefaultLeaderElectionServiceTest {
testingLeaderElectionDriver.isLeader(expectedSessionID);
executorService.trigger();
- leaderElectionService.stop();
+ leaderElection.close();
assertThat(leaderElectionService.hasLeadership(expectedSessionID))
.isFalse();
@@ -430,12 +434,12 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testOnGrantLeadershipIsIgnoredAfterBeingStop() throws Exception {
+ void testOnGrantLeadershipIsIgnoredAfterLeaderElectionBeingStop() throws
Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
() -> {
- leaderElectionService.stop();
+ leaderElection.close();
testingLeaderElectionDriver.isLeader();
assertThat(leaderElectionService.getLeaderSessionID())
@@ -450,14 +454,14 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testOnLeaderInformationChangeIsIgnoredAfterBeingStop() throws
Exception {
+ void testOnLeaderInformationChangeIsIgnoredAfterLeaderElectionBeingStop()
throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
() -> {
testingLeaderElectionDriver.isLeader();
- leaderElectionService.stop();
+ leaderElection.close();
testingLeaderElectionDriver.leaderInformationChanged(
LeaderInformation.empty());
@@ -470,7 +474,7 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testOnRevokeLeadershipIsTriggeredAfterBeingStop() throws Exception {
+ void testOnRevokeLeadershipIsTriggeredAfterLeaderElectionBeingStop()
throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
@@ -480,7 +484,7 @@ class DefaultLeaderElectionServiceTest {
assertThat(testingContender.getLeaderSessionID())
.isEqualTo(oldSessionId);
- leaderElectionService.stop();
+ leaderElection.close();
assertThat(testingContender.getLeaderSessionID())
.as(
@@ -555,14 +559,15 @@ class DefaultLeaderElectionServiceTest {
}
@Test
- void testErrorIsIgnoredAfterBeingStop() throws Exception {
+ void testErrorIsIgnoredAfterLeaderElectionBeingClosed() throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
() -> {
final Exception testException = new
Exception("test leader exception");
- leaderElectionService.stop();
+ leaderElection.close();
+
testingLeaderElectionDriver.onFatalError(testException);
assertThat(testingContender.getError()).isNull();
});
@@ -581,12 +586,11 @@ class DefaultLeaderElectionServiceTest {
new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
final DefaultLeaderElectionService leaderElectionService =
new
DefaultLeaderElectionService(testingLeaderElectionDriverFactory);
+ leaderElectionService.startLeaderElectionBackend();
final TestingContender testingContender =
new TestingContender(TEST_URL, leaderElectionService);
-
- leaderElectionService.startLeaderElectionBackend();
- testingContender.startLeaderElection();
+ final LeaderElection leaderElection =
testingContender.startLeaderElection();
final TestingLeaderElectionDriver currentLeaderDriver =
Preconditions.checkNotNull(
@@ -594,7 +598,8 @@ class DefaultLeaderElectionServiceTest {
currentLeaderDriver.isLeader();
- leaderElectionService.stop();
+ leaderElection.close();
+ leaderElectionService.close();
testingContender.throwErrorIfPresent();
}
@@ -646,7 +651,7 @@ class DefaultLeaderElectionServiceTest {
latch.trigger();
- testInstance.stop();
+ leaderElection.close();
testInstance.close();
}
@@ -698,7 +703,7 @@ class DefaultLeaderElectionServiceTest {
latch.trigger();
- testInstance.stop();
+ leaderElection.close();
testInstance.close();
}
@@ -740,10 +745,15 @@ class DefaultLeaderElectionServiceTest {
assertThat(testingLeaderElectionDriver).isNotNull();
testMethod.run();
-
- leaderElectionService.stop();
- leaderElectionService.close();
} finally {
+ if (leaderElection != null) {
+ leaderElection.close();
+ }
+
+ if (leaderElectionService != null) {
+ leaderElectionService.close();
+ }
+
if (testingContender != null) {
testingContender.throwErrorIfPresent();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
index f15bdd069af..3e1de05856a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java
@@ -23,8 +23,10 @@ import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
import java.util.function.Function;
import static org.assertj.core.api.Assertions.assertThat;
@@ -39,27 +41,27 @@ class DefaultLeaderElectionTest {
TestingAbstractLeaderElectionService.newBuilder()
.setRegisterConsumer(contenderRef::set)
.build();
- final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService);
+ try (final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService)) {
- final LeaderContender contender =
TestingGenericLeaderContender.newBuilder().build();
- testInstance.startLeaderElection(contender);
+ final LeaderContender contender =
TestingGenericLeaderContender.newBuilder().build();
+ testInstance.startLeaderElection(contender);
- assertThat(contenderRef.get()).isSameAs(contender);
+ assertThat(contenderRef.get()).isSameAs(contender);
+ }
}
@Test
- void testContenderRegistrationNull() {
- assertThatThrownBy(
- () ->
- new DefaultLeaderElection(
-
TestingAbstractLeaderElectionService.newBuilder()
- .build())
- .startLeaderElection(null))
- .isInstanceOf(NullPointerException.class);
+ void testContenderRegistrationNull() throws Exception {
+ try (final DefaultLeaderElection testInstance =
+ new DefaultLeaderElection(
+
TestingAbstractLeaderElectionService.newBuilder().build())) {
+ assertThatThrownBy(() -> testInstance.startLeaderElection(null))
+ .isInstanceOf(NullPointerException.class);
+ }
}
@Test
- void testContenderRegistrationFailure() {
+ void testContenderRegistrationFailure() throws Exception {
final Exception expectedException =
new Exception("Expected exception during contender
registration.");
final AbstractLeaderElectionService parentService =
@@ -69,17 +71,17 @@ class DefaultLeaderElectionTest {
throw expectedException;
})
.build();
- final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService);
-
- assertThatThrownBy(
- () ->
- testInstance.startLeaderElection(
-
TestingGenericLeaderContender.newBuilder().build()))
- .isEqualTo(expectedException);
+ try (final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService)) {
+ assertThatThrownBy(
+ () ->
+ testInstance.startLeaderElection(
+
TestingGenericLeaderContender.newBuilder().build()))
+ .isEqualTo(expectedException);
+ }
}
@Test
- void testLeaderConfirmation() {
+ void testLeaderConfirmation() throws Exception {
final AtomicReference<UUID> leaderSessionIDRef = new
AtomicReference<>();
final AtomicReference<String> leaderAddressRef = new
AtomicReference<>();
final AbstractLeaderElectionService parentService =
@@ -90,27 +92,63 @@ class DefaultLeaderElectionTest {
leaderAddressRef.set(address);
})
.build();
+ try (final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService)) {
+
+ final UUID expectedLeaderSessionID = UUID.randomUUID();
+ final String expectedAddress = "random-address";
+ testInstance.confirmLeadership(expectedLeaderSessionID,
expectedAddress);
+
+
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+ assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress);
+ }
+ }
+
+ @Test
+ void testClose() throws Exception {
+ final CompletableFuture<LeaderContender> actualContender = new
CompletableFuture<>();
+ final AbstractLeaderElectionService parentService =
+ TestingAbstractLeaderElectionService.newBuilder()
+ .setRegisterConsumer(ignoredContender -> {})
+ .setRemoveConsumer(actualContender::complete)
+ .build();
+
final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService);
- final UUID expectedLeaderSessionID = UUID.randomUUID();
- final String expectedAddress = "random-address";
- testInstance.confirmLeadership(expectedLeaderSessionID,
expectedAddress);
+ final LeaderContender contender =
TestingGenericLeaderContender.newBuilder().build();
+ testInstance.startLeaderElection(contender);
+ testInstance.close();
+
+ assertThat(actualContender).isCompletedWithValue(contender);
+ }
+
+ @Test
+ void testCloseWithoutStart() throws Exception {
+ final CompletableFuture<LeaderContender> actualContender = new
CompletableFuture<>();
+ final AbstractLeaderElectionService parentService =
+ TestingAbstractLeaderElectionService.newBuilder()
+ .setRemoveConsumer(actualContender::complete)
+ .build();
+
+ final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService);
+ testInstance.close();
-
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
- assertThat(leaderAddressRef.get()).isEqualTo(expectedAddress);
+ assertThat(actualContender)
+ .as(
+ "No removal should be triggered if there's no
contender that need to be deregistered.")
+ .isNotDone();
}
@Test
- void testHasLeadershipTrue() {
+ void testHasLeadershipTrue() throws Exception {
testHasLeadership(true);
}
@Test
- void testHasLeadershipFalse() {
+ void testHasLeadershipFalse() throws Exception {
testHasLeadership(false);
}
- private void testHasLeadership(boolean expectedReturnValue) {
+ private void testHasLeadership(boolean expectedReturnValue) throws
Exception {
final AtomicReference<UUID> leaderSessionIDRef = new
AtomicReference<>();
final AbstractLeaderElectionService parentService =
TestingAbstractLeaderElectionService.newBuilder()
@@ -120,28 +158,32 @@ class DefaultLeaderElectionTest {
return expectedReturnValue;
})
.build();
- final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService);
+ try (final DefaultLeaderElection testInstance = new
DefaultLeaderElection(parentService)) {
- final UUID expectedLeaderSessionID = UUID.randomUUID();
- assertThat(testInstance.hasLeadership(expectedLeaderSessionID))
- .isEqualTo(expectedReturnValue);
-
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+ final UUID expectedLeaderSessionID = UUID.randomUUID();
+ assertThat(testInstance.hasLeadership(expectedLeaderSessionID))
+ .isEqualTo(expectedReturnValue);
+
assertThat(leaderSessionIDRef.get()).isEqualTo(expectedLeaderSessionID);
+ }
}
private static class TestingAbstractLeaderElectionService
extends AbstractLeaderElectionService {
private final ThrowingConsumer<LeaderContender, Exception>
registerConsumer;
+ private final Consumer<LeaderContender> removeConsumer;
private final BiConsumer<UUID, String> confirmLeadershipConsumer;
private final Function<UUID, Boolean> hasLeadershipFunction;
private TestingAbstractLeaderElectionService(
ThrowingConsumer<LeaderContender, Exception> registerConsumer,
+ Consumer<LeaderContender> removeConsumer,
BiConsumer<UUID, String> confirmLeadershipConsumer,
Function<UUID, Boolean> hasLeadershipFunction) {
super();
this.registerConsumer = registerConsumer;
+ this.removeConsumer = removeConsumer;
this.confirmLeadershipConsumer = confirmLeadershipConsumer;
this.hasLeadershipFunction = hasLeadershipFunction;
}
@@ -151,6 +193,11 @@ class DefaultLeaderElectionTest {
registerConsumer.accept(contender);
}
+ @Override
+ protected void remove(LeaderContender contender) {
+ removeConsumer.accept(contender);
+ }
+
@Override
protected void confirmLeadership(UUID leaderSessionID, String
leaderAddress) {
confirmLeadershipConsumer.accept(leaderSessionID, leaderAddress);
@@ -161,17 +208,13 @@ class DefaultLeaderElectionTest {
return hasLeadershipFunction.apply(leaderSessionId);
}
- @Override
- public void stop() {
- throw new UnsupportedOperationException("stop is not supported.");
- }
-
public static Builder newBuilder() {
return new Builder()
.setRegisterConsumer(
contender -> {
throw new
UnsupportedOperationException("register not supported");
})
+ .setRemoveConsumer(contender -> {})
.setConfirmLeadershipConsumer(
(leaderSessionID, address) -> {
throw new UnsupportedOperationException(
@@ -188,6 +231,7 @@ class DefaultLeaderElectionTest {
private ThrowingConsumer<LeaderContender, Exception>
registerConsumer =
ignoredContender -> {};
+ private Consumer<LeaderContender> removeConsumer;
private BiConsumer<UUID, String> confirmLeadershipConsumer =
(ignoredSessionID, ignoredAddress) -> {};
private Function<UUID, Boolean> hasLeadershipFunction =
ignoredSessiondID -> false;
@@ -200,6 +244,11 @@ class DefaultLeaderElectionTest {
return this;
}
+ public Builder setRemoveConsumer(Consumer<LeaderContender>
removeConsumer) {
+ this.removeConsumer = removeConsumer;
+ return this;
+ }
+
public Builder setConfirmLeadershipConsumer(
BiConsumer<UUID, String> confirmLeadershipConsumer) {
this.confirmLeadershipConsumer = confirmLeadershipConsumer;
@@ -213,7 +262,10 @@ class DefaultLeaderElectionTest {
public TestingAbstractLeaderElectionService build() {
return new TestingAbstractLeaderElectionService(
- registerConsumer, confirmLeadershipConsumer,
hasLeadershipFunction);
+ registerConsumer,
+ removeConsumer,
+ confirmLeadershipConsumer,
+ hasLeadershipFunction);
}
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
index f5fe44c7ae5..0c1f7e038fd 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java
@@ -98,9 +98,13 @@ public class LeaderElectionTest {
assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue();
- leaderElectionService.stop();
+ leaderElection.close();
assertThat(leaderElection.hasLeadership(leaderSessionId)).isFalse();
+
+ assertThat(manualLeaderContender.waitForLeaderSessionId())
+ .as("The leadership has been revoked from the contender.")
+ .isEqualTo(ManualLeaderContender.NULL_LEADER_SESSION_ID);
} finally {
manualLeaderContender.rethrowError();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
index 1dc9aa80cef..e5b51688686 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java
@@ -42,8 +42,7 @@ class StandaloneLeaderElectionTest {
TestingContender contender = new TestingContender(TEST_URL,
leaderElectionService);
TestingListener testingListener = new TestingListener();
- try {
- contender.startLeaderElection();
+ try (LeaderElection leaderElection = contender.startLeaderElection()) {
leaderRetrievalService.start(testingListener);
contender.waitForLeader();
@@ -58,7 +57,6 @@ class StandaloneLeaderElectionTest {
assertThat(testingListener.getLeaderSessionID())
.isEqualTo(HighAvailabilityServices.DEFAULT_LEADER_ID);
} finally {
- leaderElectionService.stop();
leaderRetrievalService.stop();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
index 5885975ec50..1855c20e1a2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java
@@ -80,7 +80,8 @@ public class TestingLeaderElection implements LeaderElection {
return issuedLeaderSessionId != null;
}
- synchronized void triggerContenderCleanup() {
+ @Override
+ public synchronized void close() {
if (hasLeadership() && this.contender != null) {
this.contender.revokeLeadership();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
index 36f5462970a..d98612c7a9f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionService.java
@@ -36,11 +36,6 @@ public class TestingLeaderElectionService implements
LeaderElectionService {
return startedLeaderElection;
}
- @Override
- public synchronized void stop() throws Exception {
- startedLeaderElection.triggerContenderCleanup();
- }
-
public synchronized CompletableFuture<LeaderInformation> isLeader(UUID
leaderSessionID) {
return startedLeaderElection.isLeader(leaderSessionID);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
index 8344a468d71..24d0532768e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java
@@ -141,13 +141,13 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
new DefaultLeaderElectionService(leaderElectionDriverFactory);
leaderElectionService.startLeaderElectionBackend();
- try {
- final TestingConnectionStateListener connectionStateListener =
- new TestingConnectionStateListener();
-
client.getConnectionStateListenable().addListener(connectionStateListener);
+ final TestingConnectionStateListener connectionStateListener =
+ new TestingConnectionStateListener();
+
client.getConnectionStateListenable().addListener(connectionStateListener);
- final TestingContender contender = new TestingContender();
-
leaderElectionService.createLeaderElection().startLeaderElection(contender);
+ final TestingContender contender = new TestingContender();
+ try (LeaderElection leaderElection =
leaderElectionService.createLeaderElection()) {
+ leaderElection.startLeaderElection(contender);
contender.awaitGrantLeadership();
@@ -165,7 +165,6 @@ class ZooKeeperLeaderElectionConnectionHandlingTest {
validationLogic.accept(connectionStateListener, contender);
} finally {
- leaderElectionService.stop();
leaderElectionService.close();
curatorFrameworkWrapper.close();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 7c72acdd231..e83702a960e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -172,6 +172,7 @@ class ZooKeeperLeaderElectionTest {
DefaultLeaderElectionService[] leaderElectionService =
new DefaultLeaderElectionService[num];
+ LeaderElection[] leaderElections = new LeaderElection[num];
TestingContender[] contenders = new TestingContender[num];
DefaultLeaderRetrievalService leaderRetrievalService = null;
@@ -192,7 +193,7 @@ class ZooKeeperLeaderElectionTest {
LOG.debug("Start leader election service for contender #{}.",
i);
- contenders[i].startLeaderElection();
+ leaderElections[i] = contenders[i].startLeaderElection();
}
String pattern = LEADER_ADDRESS + "_" + "(\\d+)";
@@ -219,7 +220,8 @@ class ZooKeeperLeaderElectionTest {
LOG.debug(
"Stop leader election service of contender
#{}.",
numberSeenLeaders);
- leaderElectionService[index].stop();
+ leaderElections[index].close();
+ leaderElections[index] = null;
leaderElectionService[index].close();
leaderElectionService[index] = null;
@@ -239,9 +241,14 @@ class ZooKeeperLeaderElectionTest {
leaderRetrievalService.stop();
}
+ for (LeaderElection leaderElection : leaderElections) {
+ if (leaderElection != null) {
+ leaderElection.close();
+ }
+ }
+
for (DefaultLeaderElectionService electionService :
leaderElectionService) {
if (electionService != null) {
- electionService.stop();
electionService.close();
}
}
@@ -264,6 +271,7 @@ class ZooKeeperLeaderElectionTest {
DefaultLeaderElectionService[] leaderElectionService =
new DefaultLeaderElectionService[num];
+ LeaderElection[] leaderElections = new LeaderElection[num];
TestingContender[] contenders = new TestingContender[num];
DefaultLeaderRetrievalService leaderRetrievalService = null;
@@ -282,7 +290,7 @@ class ZooKeeperLeaderElectionTest {
new TestingContender(
LEADER_ADDRESS + "_" + i + "_0",
leaderElectionService[i]);
- contenders[i].startLeaderElection();
+ leaderElections[i] = contenders[i].startLeaderElection();
}
String pattern = LEADER_ADDRESS + "_" + "(\\d+)" + "_" + "(\\d+)";
@@ -303,8 +311,10 @@ class ZooKeeperLeaderElectionTest {
.isEqualTo(contenders[index].getLeaderSessionID());
// stop leader election service = revoke leadership
- leaderElectionService[index].stop();
+ leaderElections[index].close();
+ leaderElections[index] = null;
leaderElectionService[index].close();
+ leaderElections[index] = null;
// create new leader election service which takes part in
the leader election
leaderElectionService[index] =
@@ -314,7 +324,7 @@ class ZooKeeperLeaderElectionTest {
LEADER_ADDRESS + "_" + index + "_" +
(lastTry + 1),
leaderElectionService[index]);
- contenders[index].startLeaderElection();
+ leaderElections[index] =
contenders[index].startLeaderElection();
} else {
throw new Exception("Did not find the leader's index.");
}
@@ -325,9 +335,14 @@ class ZooKeeperLeaderElectionTest {
leaderRetrievalService.stop();
}
+ for (LeaderElection leaderElection : leaderElections) {
+ if (leaderElection != null) {
+ leaderElection.close();
+ }
+ }
+
for (DefaultLeaderElectionService electionService :
leaderElectionService) {
if (electionService != null) {
- electionService.stop();
electionService.close();
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
index 697be6a1c41..8964411cb3a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import
org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperMultipleComponentLeaderElectionHaServices;
import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.TestingContender;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionListener;
@@ -119,7 +119,7 @@ class ZooKeeperLeaderRetrievalTest {
long sleepingTime = 1000;
- LeaderElectionService leaderElectionService = null;
+ LeaderElection leaderElection = null;
Thread thread;
@@ -180,20 +180,18 @@ class ZooKeeperLeaderRetrievalTest {
thread.start();
- leaderElectionService =
-
highAvailabilityServices.getJobManagerLeaderElectionService(
- HighAvailabilityServices.DEFAULT_JOB_ID);
TestingContender correctLeaderAddressContender =
- new TestingContender(correctAddress,
leaderElectionService);
+ new TestingContender(
+ correctAddress,
+
highAvailabilityServices.getJobManagerLeaderElectionService(
+
HighAvailabilityServices.DEFAULT_JOB_ID));
Thread.sleep(sleepingTime);
externalProcessDriver.notLeader();
externalProcessDriver.close();
- // leaderElection is unused right now because it doesn't need
to be closed, yet.
- // The close call will be introduced with FLINK-31785
- correctLeaderAddressContender.startLeaderElection();
+ leaderElection =
correctLeaderAddressContender.startLeaderElection();
thread.join();
@@ -208,8 +206,8 @@ class ZooKeeperLeaderRetrievalTest {
socket.connect(correctInetSocketAddress, 1000);
}
} finally {
- if (leaderElectionService != null) {
- leaderElectionService.stop();
+ if (leaderElection != null) {
+ leaderElection.close();
}
}
} catch (IOException e) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
index 0beb37c2ad8..c69ee382688 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImplTest.java
@@ -77,6 +77,7 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
private TestingResourceManagerFactory.Builder rmFactoryBuilder;
private TestingLeaderElectionService leaderElectionService;
+
private ResourceManagerServiceImpl resourceManagerService;
@BeforeClass
@@ -103,10 +104,6 @@ public class ResourceManagerServiceImplTest extends
TestLogger {
resourceManagerService.close();
}
- if (leaderElectionService != null) {
- leaderElectionService.stop();
- }
-
if (fatalErrorHandler.hasExceptionOccurred()) {
fatalErrorHandler.rethrowError();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
index 81b7976616f..5f3b734ac5e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java
@@ -94,9 +94,6 @@ public class DocumentingDispatcherRestEndpoint extends
DispatcherRestEndpoint
public LeaderElection createLeaderElection() {
return NoOpLeaderElection.INSTANCE;
}
-
- @Override
- public void stop() {}
}
private enum NoOpLeaderElection implements LeaderElection {
@@ -112,5 +109,8 @@ public class DocumentingDispatcherRestEndpoint extends
DispatcherRestEndpoint
public boolean hasLeadership(UUID leaderSessionId) {
return false;
}
+
+ @Override
+ public void close() {}
}
}