zentol commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168724716
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElectionDriver.java:
##########
@@ -70,22 +77,37 @@ public LeaderInformation getLeaderInformation() {
}
public void isLeader() {
+ isLeader(FutureUtils.completedVoidFuture());
+ }
+
+ public void isLeader(CompletableFuture<Void> grantLeadershipFuture) {
synchronized (lock) {
isLeader.set(true);
- leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
+ grantLeadershipFuture.thenRun(
+ () ->
leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID()));
Review Comment:
Why is it not a problem that onGrantLeadership is no longer called under the
lock? Could this now result in concurrent onGrant/onRevoke calls being made?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
};
}
+ @Test
+ void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ final CompletableFuture<Void> grantLeadershipFuture = new
CompletableFuture<>();
+ driver.isLeader(grantLeadershipFuture);
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ assertThat(testInstance.getLeaderSessionID())
+ .as("Leadership grant was not forwarded to the contender,
yet.")
+ .isNull();
+
+ grantLeadershipFuture.complete(null);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * Test to cover the issue described in FLINK-31814. This test could be
removed after
+ * FLINK-31814 is resolved.
+ */
+ @Test
+ void testOnRevokeCallWhileClosingService() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+ LeaderElectionEventHandler::onRevokeLeadership);
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ driver.isLeader();
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * This issue can happen when the shutdown of the contender takes too long
and the leadership is
+ * re-acquired in the meantime (see FLINK-29234).
Review Comment:
I dont understand how this comment relates to the test.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
* Default implementation for leader election service. Composed with different
{@link
* LeaderElectionDriver}, we could perform a leader election for the
contender, and then persist the
* leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link
LeaderContender}.
*/
public class DefaultLeaderElectionService
- implements LeaderElectionService, LeaderElectionEventHandler {
+ implements LeaderElectionService, LeaderElectionEventHandler,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
private final Object lock = new Object();
private final LeaderElectionDriverFactory leaderElectionDriverFactory;
- /** The leader contender which applies for leadership. */
+ /**
+ * {@code leaderContender} being {@code null} indicates that no {@link
LeaderContender} is
+ * registered that participates in the leader election, yet. See {@link
#start(LeaderContender)}
+ * and {@link #stop()} for lifecycle management.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private volatile LeaderContender leaderContender;
+ private LeaderContender leaderContender;
+ /**
+ * Saves the session ID which was issued by the {@link
LeaderElectionDriver} iff the leadership
Review Comment:
```suggestion
* Saves the session ID which was issued by the {@link
LeaderElectionDriver} if the leadership
```
Not looking forward to more PRs trying to "fix a type".
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
};
}
+ @Test
+ void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ final CompletableFuture<Void> grantLeadershipFuture = new
CompletableFuture<>();
+ driver.isLeader(grantLeadershipFuture);
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ assertThat(testInstance.getLeaderSessionID())
+ .as("Leadership grant was not forwarded to the contender,
yet.")
+ .isNull();
+
+ grantLeadershipFuture.complete(null);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * Test to cover the issue described in FLINK-31814. This test could be
removed after
+ * FLINK-31814 is resolved.
+ */
+ @Test
+ void testOnRevokeCallWhileClosingService() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory(
+ LeaderElectionEventHandler::onRevokeLeadership);
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ driver.isLeader();
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * This issue can happen when the shutdown of the contender takes too long
and the leadership is
+ * re-acquired in the meantime (see FLINK-29234).
+ */
+ @Test
+ void testStopWhileHavingLeadership() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
Review Comment:
Why isnt this just part of the constructor?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderContender.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+import java.util.function.Consumer;
+
+/**
+ * {@code TestingLeaderContender} is a more generic implementation in
comparison to {@link
+ * TestingContender}.
+ */
+public class TestingLeaderContender implements LeaderContender {
+
+ private final Consumer<UUID> grantLeadershipConsumer;
+ private final Runnable revokeLeadershipRunnable;
+ private final Consumer<Exception> handleErrorConsumer;
+
+ public TestingLeaderContender(
+ Consumer<UUID> grantLeadershipConsumer,
+ Runnable revokeLeadershipRunnable,
+ Consumer<Exception> handleErrorConsumer) {
+ this.grantLeadershipConsumer = grantLeadershipConsumer;
+ this.revokeLeadershipRunnable = revokeLeadershipRunnable;
+ this.handleErrorConsumer = handleErrorConsumer;
+ }
+
+ @Override
+ public void grantLeadership(UUID leaderSessionID) {
+ grantLeadershipConsumer.accept(leaderSessionID);
+ }
+
+ @Override
+ public void revokeLeadership() {
+ revokeLeadershipRunnable.run();
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ handleErrorConsumer.accept(exception);
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /** {@code Builder} for {@code TestingLeaderContender}. */
+ public static class Builder {
Review Comment:
add private constructor to enforce single instantiation path
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -69,6 +71,118 @@ void testOnGrantAndRevokeLeadership() throws Exception {
};
}
+ @Test
+ void testDelayedGrantCallAfterContenderRegistration() throws Exception {
+ final TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory
driverFactory =
+ new
TestingLeaderElectionDriver.TestingLeaderElectionDriverFactory();
+ try (final DefaultLeaderElectionService testInstance =
+ new DefaultLeaderElectionService(driverFactory)) {
+ testInstance.startLeaderElectionBackend();
+
+ final TestingLeaderElectionDriver driver =
driverFactory.getCurrentLeaderDriver();
+ assertThat(driver).isNotNull();
+
+ final CompletableFuture<Void> grantLeadershipFuture = new
CompletableFuture<>();
+ driver.isLeader(grantLeadershipFuture);
+
+ final TestingContender contender = new
TestingContender("unused-address", testInstance);
+ testInstance.start(contender);
+
+ assertThat(testInstance.getLeaderSessionID())
+ .as("Leadership grant was not forwarded to the contender,
yet.")
+ .isNull();
+
+ grantLeadershipFuture.complete(null);
+
+ contender.waitForLeader();
+
+ testInstance.stop();
+ }
+ }
+
+ /**
+ * Test to cover the issue described in FLINK-31814. This test could be
removed after
+ * FLINK-31814 is resolved.
+ */
+ @Test
+ void testOnRevokeCallWhileClosingService() throws Exception {
Review Comment:
Is the description in FLINK-31814 based on your WIP branch? Asking since
there is neither a `DefaultLeaderElection.close()` or
`DefaultLeaderElectionService.close` in the current master...
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -77,22 +100,48 @@ public
DefaultLeaderElectionService(LeaderElectionDriverFactory leaderElectionDr
this.leaderElectionDriver = null;
this.confirmedLeaderInformation = LeaderInformation.empty();
+ }
- this.running = false;
+ /**
+ * Starts the leader election process. This method has to be called before
registering a {@link
+ * LeaderContender}.
+ */
+ public void startLeaderElectionBackend() throws Exception {
+ synchronized (lock) {
+ Preconditions.checkState(
+ leaderContender == null,
+ "No LeaderContender should have been registered, yet.");
+
+ leaderElectionDriver =
+ leaderElectionDriverFactory.createLeaderElectionDriver(
+ this, new LeaderElectionFatalErrorHandler());
+
+ LOG.info("Instantiating DefaultLeaderElectionService with {}.",
leaderElectionDriver);
+ }
}
@Override
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
- Preconditions.checkState(leaderContender == null, "Contender was
already set.");
synchronized (lock) {
- running = true;
+ Preconditions.checkState(
+ leaderContender == null,
+ "Only one LeaderContender is allowed to be registered to
this service.");
+ Preconditions.checkState(
+ leaderElectionDriver != null,
+ "The DefaultLeaderElectionService should have established
a connection to the backend before it's started.");
+
leaderContender = contender;
- leaderElectionDriver =
- leaderElectionDriverFactory.createLeaderElectionDriver(
- this, new LeaderElectionFatalErrorHandler());
- LOG.info("Starting DefaultLeaderElectionService with {}.",
leaderElectionDriver);
+
+ LOG.info(
+ "LeaderContender {} has been registered for {}.",
+ contender.getDescription(),
+ leaderElectionDriver);
+
+ if (hasLeadership()) {
+ notifyLeaderContenderOfLeadership();
Review Comment:
Wondering if this should really happen in `start()` :thinking: That means
it's a different thread than usually, right? Usually it'd be some HA backend
thread, but now it's the main thread?
Why isnt this calling `onGrantLeadership`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
* Default implementation for leader election service. Composed with different
{@link
* LeaderElectionDriver}, we could perform a leader election for the
contender, and then persist the
* leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link
LeaderContender}.
*/
public class DefaultLeaderElectionService
- implements LeaderElectionService, LeaderElectionEventHandler {
+ implements LeaderElectionService, LeaderElectionEventHandler,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
private final Object lock = new Object();
private final LeaderElectionDriverFactory leaderElectionDriverFactory;
- /** The leader contender which applies for leadership. */
+ /**
+ * {@code leaderContender} being {@code null} indicates that no {@link
LeaderContender} is
+ * registered that participates in the leader election, yet. See {@link
#start(LeaderContender)}
+ * and {@link #stop()} for lifecycle management.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private volatile LeaderContender leaderContender;
+ private LeaderContender leaderContender;
+ /**
+ * Saves the session ID which was issued by the {@link
LeaderElectionDriver} iff the leadership
+ * is acquired by this service. {@code issuedLeaderSessionID} being {@code
null} indicates that
+ * this service isn't the leader right now (i.e. {@code
+ * leaderElectionDriver.hasLeadership(UUID)} would return {@code false}
for any session ID.
+ */
@GuardedBy("lock")
@Nullable
- private volatile UUID issuedLeaderSessionID;
+ private UUID issuedLeaderSessionID;
+ /**
+ * Saves the leader information for a registered {@link LeaderContender}
after this contender
+ * confirmed the leadership.
+ */
@GuardedBy("lock")
- private volatile LeaderInformation confirmedLeaderInformation;
+ private LeaderInformation confirmedLeaderInformation;
+ /**
+ * {@code leaderElectionDriver} being {@code null} indicates that the
connection to the
+ * LeaderElection backend isn't established, yet. See {@link
#startLeaderElectionBackend()} and
+ * {@link #close()} for lifecycle management. The lifecycle of the driver
should have been
+ * established before registering a {@link LeaderContender} and stopped
after the contender has
+ * been removed.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- private volatile boolean running;
-
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private LeaderElectionDriver leaderElectionDriver;
+ private volatile LeaderElectionDriver leaderElectionDriver;
Review Comment:
why is this one now volatile when everything else isn't?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID,
String leaderAddress) {
} else {
LOG.warn(
"The leader session ID {} was confirmed even
though the "
- + "corresponding JobManager was not
elected as the leader.",
+ + "corresponding service was not elected
as the leader or has been stopped already.",
leaderSessionID);
}
}
}
}
+ @GuardedBy("lock")
+ private boolean hasLeadership() {
+ return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID
!= null;
+ }
+
@Override
public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
synchronized (lock) {
- if (running) {
- return leaderElectionDriver.hasLeadership()
- && leaderSessionId.equals(issuedLeaderSessionID);
+ if (leaderElectionDriver != null) {
+ if (leaderContender != null) {
+ return hasLeadership() &&
leaderSessionId.equals(issuedLeaderSessionID);
+ } else {
+ LOG.debug(
+ "hasLeadership is called after the LeaderContender
was removed, returning false.");
+ return false;
+ }
} else {
- LOG.debug("hasLeadership is called after the service is
stopped, returning false.");
+ LOG.debug("hasLeadership is called after the service is
closed, returning false.");
return false;
}
}
}
- /**
- * Returns the current leader session ID or null, if the contender is not
the leader.
- *
- * @return The last leader session ID or null, if the contender is not the
leader
- */
+ /** Returns the current leader session ID or {@code null}, if the session
wasn't confirmed. */
@VisibleForTesting
@Nullable
public UUID getLeaderSessionID() {
- return confirmedLeaderInformation.getLeaderSessionID();
- }
-
- @GuardedBy("lock")
- private void confirmLeaderInformation(UUID leaderSessionID, String
leaderAddress) {
- confirmedLeaderInformation = LeaderInformation.known(leaderSessionID,
leaderAddress);
-
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+ synchronized (lock) {
+ return confirmedLeaderInformation.getLeaderSessionID();
+ }
}
@Override
public void onGrantLeadership(UUID newLeaderSessionId) {
+ Preconditions.checkNotNull(newLeaderSessionId);
+
synchronized (lock) {
- if (running) {
- issuedLeaderSessionID = newLeaderSessionId;
- confirmedLeaderInformation = LeaderInformation.empty();
+ Preconditions.checkState(
+ issuedLeaderSessionID == null,
+ "The leadership should have been granted while not having
the leadership acquired.");
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getDescription(),
- issuedLeaderSessionID);
+ issuedLeaderSessionID = newLeaderSessionId;
- leaderContender.grantLeadership(issuedLeaderSessionID);
+ if (leaderContender != null) {
+ notifyLeaderContenderOfLeadership();
} else {
LOG.debug(
- "Ignoring the grant leadership notification since the
{} has already been closed.",
+ "The grant leadership notification is not forwarded
because the DefaultLeaderElectionService ({}) has no contender registered.",
leaderElectionDriver);
}
}
}
+ @GuardedBy("lock")
+ private void notifyLeaderContenderOfLeadership() {
+ Preconditions.checkState(
+ confirmedLeaderInformation.isEmpty(),
+ "The leadership should have been granted while not having the
leadership acquired.");
+
+ LOG.debug(
+ "Granting leadership to contender {} with session ID {}.",
+ leaderContender.getDescription(),
+ issuedLeaderSessionID);
+
+ leaderContender.grantLeadership(issuedLeaderSessionID);
+ }
+
@Override
public void onRevokeLeadership() {
synchronized (lock) {
- if (running) {
- handleLeadershipLoss();
+ // TODO: FLINK-31814 covers adding this Precondition
+ // Preconditions.checkState(issuedLeaderSessionID != null,"The
leadership should have
+ // been revoked while having the leadership acquired.");
+
+ final UUID previousSessionID = issuedLeaderSessionID;
+ issuedLeaderSessionID = null;
+
+ if (leaderContender != null) {
Review Comment:
Is this branching also related to FLINK-31814?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java:
##########
@@ -772,7 +773,11 @@ void
testJobMasterServiceLeadershipRunnerCloseWhenElectionServiceGrantLeaderShip
closeAsyncCalledTrigger.await();
final CheckedThread grantLeadershipThread =
- createCheckedThread(currentLeaderDriver::isLeader);
+ createCheckedThread(
+ () -> {
+ currentLeaderDriver.notLeader();
+ currentLeaderDriver.isLeader();
Review Comment:
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]