reswqa commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168903359
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
* Default implementation for leader election service. Composed with different
{@link
* LeaderElectionDriver}, we could perform a leader election for the
contender, and then persist the
* leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link
LeaderContender}.
*/
public class DefaultLeaderElectionService
- implements LeaderElectionService, LeaderElectionEventHandler {
+ implements LeaderElectionService, LeaderElectionEventHandler,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
private final Object lock = new Object();
private final LeaderElectionDriverFactory leaderElectionDriverFactory;
- /** The leader contender which applies for leadership. */
+ /**
+ * {@code leaderContender} being {@code null} indicates that no {@link
LeaderContender} is
+ * registered that participates in the leader election, yet. See {@link
#start(LeaderContender)}
+ * and {@link #stop()} for lifecycle management.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private volatile LeaderContender leaderContender;
+ private LeaderContender leaderContender;
+ /**
+ * Saves the session ID which was issued by the {@link
LeaderElectionDriver} iff the leadership
+ * is acquired by this service. {@code issuedLeaderSessionID} being {@code
null} indicates that
+ * this service isn't the leader right now (i.e. {@code
+ * leaderElectionDriver.hasLeadership(UUID)} would return {@code false}
for any session ID.
+ */
@GuardedBy("lock")
@Nullable
- private volatile UUID issuedLeaderSessionID;
+ private UUID issuedLeaderSessionID;
+ /**
+ * Saves the leader information for a registered {@link LeaderContender}
after this contender
+ * confirmed the leadership.
+ */
@GuardedBy("lock")
- private volatile LeaderInformation confirmedLeaderInformation;
+ private LeaderInformation confirmedLeaderInformation;
+ /**
+ * {@code leaderElectionDriver} being {@code null} indicates that the
connection to the
+ * LeaderElection backend isn't established, yet. See {@link
#startLeaderElectionBackend()} and
+ * {@link #close()} for lifecycle management. The lifecycle of the driver
should have been
+ * established before registering a {@link LeaderContender} and stopped
after the contender has
+ * been removed.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- private volatile boolean running;
-
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private LeaderElectionDriver leaderElectionDriver;
+ private volatile LeaderElectionDriver leaderElectionDriver;
Review Comment:
It seems that this field is guarded by `lock`, why does it still need to be
volatile?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
* Default implementation for leader election service. Composed with different
{@link
* LeaderElectionDriver}, we could perform a leader election for the
contender, and then persist the
* leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link
LeaderContender}.
*/
public class DefaultLeaderElectionService
- implements LeaderElectionService, LeaderElectionEventHandler {
+ implements LeaderElectionService, LeaderElectionEventHandler,
AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
private final Object lock = new Object();
private final LeaderElectionDriverFactory leaderElectionDriverFactory;
- /** The leader contender which applies for leadership. */
+ /**
+ * {@code leaderContender} being {@code null} indicates that no {@link
LeaderContender} is
+ * registered that participates in the leader election, yet. See {@link
#start(LeaderContender)}
+ * and {@link #stop()} for lifecycle management.
+ *
+ * <p>{@code @Nullable} isn't used here to avoid having multiple warnings
spread over this class
+ * in a supporting IDE.
+ */
@GuardedBy("lock")
- // @Nullable is commented-out to avoid having multiple warnings spread
over this class
- // this.running=true ensures that leaderContender != null
- private volatile LeaderContender leaderContender;
+ private LeaderContender leaderContender;
+ /**
+ * Saves the session ID which was issued by the {@link
LeaderElectionDriver} iff the leadership
+ * is acquired by this service. {@code issuedLeaderSessionID} being {@code
null} indicates that
+ * this service isn't the leader right now (i.e. {@code
Review Comment:
Unpaired symbol: `(` seems to be missing
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -257,7 +360,7 @@ public void onLeaderInformationChange(LeaderInformation
leaderInformation) {
}
} else {
LOG.debug(
- "Ignoring change notification since the {} has " +
"already been closed.",
+ "Ignoring change notification since the {} has already
been closed.",
Review Comment:
```suggestion
"Ignoring change notification since the {} has
already been stopped.",
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID,
String leaderAddress) {
} else {
LOG.warn(
"The leader session ID {} was confirmed even
though the "
- + "corresponding JobManager was not
elected as the leader.",
+ + "corresponding service was not elected
as the leader or has been stopped already.",
leaderSessionID);
}
}
}
}
+ @GuardedBy("lock")
+ private boolean hasLeadership() {
+ return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID
!= null;
+ }
+
@Override
public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
synchronized (lock) {
- if (running) {
- return leaderElectionDriver.hasLeadership()
- && leaderSessionId.equals(issuedLeaderSessionID);
+ if (leaderElectionDriver != null) {
+ if (leaderContender != null) {
+ return hasLeadership() &&
leaderSessionId.equals(issuedLeaderSessionID);
+ } else {
+ LOG.debug(
+ "hasLeadership is called after the LeaderContender
was removed, returning false.");
Review Comment:
nit: maybe `hasLeadership is called after the service is stopped, returning
false.`
##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -302,6 +303,8 @@ void testZooKeeperReelectionWithReplacement() throws
Exception {
// stop leader election service = revoke leadership
leaderElectionService[index].stop();
+ leaderElectionService[index].close();
Review Comment:
Do we need to also close it in finally block?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID,
String leaderAddress) {
} else {
LOG.warn(
"The leader session ID {} was confirmed even
though the "
- + "corresponding JobManager was not
elected as the leader.",
+ + "corresponding service was not elected
as the leader or has been stopped already.",
leaderSessionID);
}
}
}
}
+ @GuardedBy("lock")
+ private boolean hasLeadership() {
+ return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID
!= null;
+ }
+
@Override
public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
synchronized (lock) {
- if (running) {
- return leaderElectionDriver.hasLeadership()
- && leaderSessionId.equals(issuedLeaderSessionID);
+ if (leaderElectionDriver != null) {
+ if (leaderContender != null) {
+ return hasLeadership() &&
leaderSessionId.equals(issuedLeaderSessionID);
+ } else {
+ LOG.debug(
+ "hasLeadership is called after the LeaderContender
was removed, returning false.");
+ return false;
+ }
} else {
- LOG.debug("hasLeadership is called after the service is
stopped, returning false.");
+ LOG.debug("hasLeadership is called after the service is
closed, returning false.");
return false;
}
}
}
- /**
- * Returns the current leader session ID or null, if the contender is not
the leader.
- *
- * @return The last leader session ID or null, if the contender is not the
leader
- */
+ /** Returns the current leader session ID or {@code null}, if the session
wasn't confirmed. */
@VisibleForTesting
@Nullable
public UUID getLeaderSessionID() {
- return confirmedLeaderInformation.getLeaderSessionID();
- }
-
- @GuardedBy("lock")
- private void confirmLeaderInformation(UUID leaderSessionID, String
leaderAddress) {
- confirmedLeaderInformation = LeaderInformation.known(leaderSessionID,
leaderAddress);
-
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+ synchronized (lock) {
+ return confirmedLeaderInformation.getLeaderSessionID();
+ }
}
@Override
public void onGrantLeadership(UUID newLeaderSessionId) {
+ Preconditions.checkNotNull(newLeaderSessionId);
+
synchronized (lock) {
- if (running) {
- issuedLeaderSessionID = newLeaderSessionId;
- confirmedLeaderInformation = LeaderInformation.empty();
+ Preconditions.checkState(
+ issuedLeaderSessionID == null,
+ "The leadership should have been granted while not having
the leadership acquired.");
- LOG.debug(
- "Grant leadership to contender {} with session ID {}.",
- leaderContender.getDescription(),
- issuedLeaderSessionID);
+ issuedLeaderSessionID = newLeaderSessionId;
- leaderContender.grantLeadership(issuedLeaderSessionID);
+ if (leaderContender != null) {
+ notifyLeaderContenderOfLeadership();
} else {
LOG.debug(
- "Ignoring the grant leadership notification since the
{} has already been closed.",
+ "The grant leadership notification is not forwarded
because the DefaultLeaderElectionService ({}) has no contender registered.",
leaderElectionDriver);
}
}
}
+ @GuardedBy("lock")
+ private void notifyLeaderContenderOfLeadership() {
+ Preconditions.checkState(
+ confirmedLeaderInformation.isEmpty(),
+ "The leadership should have been granted while not having the
leadership acquired.");
+
+ LOG.debug(
+ "Granting leadership to contender {} with session ID {}.",
+ leaderContender.getDescription(),
+ issuedLeaderSessionID);
+
+ leaderContender.grantLeadership(issuedLeaderSessionID);
+ }
+
@Override
public void onRevokeLeadership() {
synchronized (lock) {
- if (running) {
- handleLeadershipLoss();
+ // TODO: FLINK-31814 covers adding this Precondition
+ // Preconditions.checkState(issuedLeaderSessionID != null,"The
leadership should have
+ // been revoked while having the leadership acquired.");
+
+ final UUID previousSessionID = issuedLeaderSessionID;
+ issuedLeaderSessionID = null;
+
+ if (leaderContender != null) {
+ notifyLeaderContenderOfLeadershipLoss();
} else {
LOG.debug(
- "Ignoring the revoke leadership notification since the
{} "
- + "has already been closed.",
+ "The revoke leadership for session {} notification is
not forwarded because the DefaultLeaderElectionService({}) has no contender
registered.",
+ previousSessionID,
leaderElectionDriver);
}
}
}
@GuardedBy("lock")
- private void handleLeadershipLoss() {
- LOG.debug(
- "Revoke leadership of {} ({}@{}).",
- leaderContender.getDescription(),
- confirmedLeaderInformation.getLeaderSessionID(),
- confirmedLeaderInformation.getLeaderAddress());
+ private void notifyLeaderContenderOfLeadershipLoss() {
+ if (confirmedLeaderInformation.isEmpty()) {
+ LOG.debug(
+ "Revoking leadership to contender {} while a previous
leadership grant wasn't confirmed, yet.",
+ leaderContender.getDescription());
+ } else {
+ LOG.debug(
+ "Revoking leadership to contender {} for {}.",
+ leaderContender.getDescription(),
+
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+ }
- issuedLeaderSessionID = null;
confirmedLeaderInformation = LeaderInformation.empty();
-
leaderContender.revokeLeadership();
}
@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation)
{
synchronized (lock) {
- if (running) {
+ if (leaderContender != null) {
LOG.trace(
"Leader node changed while {} is the leader with
session ID {}. New leader information {}.",
Review Comment:
Maybe we can use `LeaderElectionUtils.convertToString(leaderInformation)` to
replace `leaderInformation ` as the format param.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]