reswqa commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1168903359


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code 
null} indicates that
+     * this service isn't the leader right now (i.e. {@code
+     * leaderElectionDriver.hasLeadership(UUID)} would return {@code false} 
for any session ID.
+     */
     @GuardedBy("lock")
     @Nullable
-    private volatile UUID issuedLeaderSessionID;
+    private UUID issuedLeaderSessionID;
 
+    /**
+     * Saves the leader information for a registered {@link LeaderContender} 
after this contender
+     * confirmed the leadership.
+     */
     @GuardedBy("lock")
-    private volatile LeaderInformation confirmedLeaderInformation;
+    private LeaderInformation confirmedLeaderInformation;
 
+    /**
+     * {@code leaderElectionDriver} being {@code null} indicates that the 
connection to the
+     * LeaderElection backend isn't established, yet. See {@link 
#startLeaderElectionBackend()} and
+     * {@link #close()} for lifecycle management. The lifecycle of the driver 
should have been
+     * established before registering a {@link LeaderContender} and stopped 
after the contender has
+     * been removed.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    private volatile boolean running;
-
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private LeaderElectionDriver leaderElectionDriver;
+    private volatile LeaderElectionDriver leaderElectionDriver;

Review Comment:
   It seems that this field is guarded by `lock`, why does it still need to be 
volatile?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -37,35 +37,58 @@
  * Default implementation for leader election service. Composed with different 
{@link
  * LeaderElectionDriver}, we could perform a leader election for the 
contender, and then persist the
  * leader information to various storage.
+ *
+ * <p>{@code DefaultLeaderElectionService} handles a single {@link 
LeaderContender}.
  */
 public class DefaultLeaderElectionService
-        implements LeaderElectionService, LeaderElectionEventHandler {
+        implements LeaderElectionService, LeaderElectionEventHandler, 
AutoCloseable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultLeaderElectionService.class);
 
     private final Object lock = new Object();
 
     private final LeaderElectionDriverFactory leaderElectionDriverFactory;
 
-    /** The leader contender which applies for leadership. */
+    /**
+     * {@code leaderContender} being {@code null} indicates that no {@link 
LeaderContender} is
+     * registered that participates in the leader election, yet. See {@link 
#start(LeaderContender)}
+     * and {@link #stop()} for lifecycle management.
+     *
+     * <p>{@code @Nullable} isn't used here to avoid having multiple warnings 
spread over this class
+     * in a supporting IDE.
+     */
     @GuardedBy("lock")
-    // @Nullable is commented-out to avoid having multiple warnings spread 
over this class
-    // this.running=true ensures that leaderContender != null
-    private volatile LeaderContender leaderContender;
+    private LeaderContender leaderContender;
 
+    /**
+     * Saves the session ID which was issued by the {@link 
LeaderElectionDriver} iff the leadership
+     * is acquired by this service. {@code issuedLeaderSessionID} being {@code 
null} indicates that
+     * this service isn't the leader right now (i.e. {@code

Review Comment:
   Unpaired symbol: `(` seems to be missing



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -257,7 +360,7 @@ public void onLeaderInformationChange(LeaderInformation 
leaderInformation) {
                 }
             } else {
                 LOG.debug(
-                        "Ignoring change notification since the {} has " + 
"already been closed.",
+                        "Ignoring change notification since the {} has already 
been closed.",

Review Comment:
   ```suggestion
                           "Ignoring change notification since the {} has 
already been stopped.",
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, 
String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even 
though the "
-                                    + "corresponding JobManager was not 
elected as the leader.",
+                                    + "corresponding service was not elected 
as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID 
!= null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender 
was removed, returning false.");

Review Comment:
   nit: maybe `hasLeadership is called after the service is stopped, returning 
false.`



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java:
##########
@@ -302,6 +303,8 @@ void testZooKeeperReelectionWithReplacement() throws 
Exception {
 
                     // stop leader election service = revoke leadership
                     leaderElectionService[index].stop();
+                    leaderElectionService[index].close();

Review Comment:
   Do we need to also close it in finally block?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, 
String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even 
though the "
-                                    + "corresponding JobManager was not 
elected as the leader.",
+                                    + "corresponding service was not elected 
as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID 
!= null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender 
was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is 
stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is 
closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not 
the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the 
leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session 
wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, 
leaderAddress);
-        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having 
the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the 
{} has already been closed.",
+                        "The grant leadership notification is not forwarded 
because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the 
leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The 
leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadershipLoss();
             } else {
                 LOG.debug(
-                        "Ignoring the revoke leadership notification since the 
{} "
-                                + "has already been closed.",
+                        "The revoke leadership for session {} notification is 
not forwarded because the DefaultLeaderElectionService({}) has no contender 
registered.",
+                        previousSessionID,
                         leaderElectionDriver);
             }
         }
     }
 
     @GuardedBy("lock")
-    private void handleLeadershipLoss() {
-        LOG.debug(
-                "Revoke leadership of {} ({}@{}).",
-                leaderContender.getDescription(),
-                confirmedLeaderInformation.getLeaderSessionID(),
-                confirmedLeaderInformation.getLeaderAddress());
+    private void notifyLeaderContenderOfLeadershipLoss() {
+        if (confirmedLeaderInformation.isEmpty()) {
+            LOG.debug(
+                    "Revoking leadership to contender {} while a previous 
leadership grant wasn't confirmed, yet.",
+                    leaderContender.getDescription());
+        } else {
+            LOG.debug(
+                    "Revoking leadership to contender {} for {}.",
+                    leaderContender.getDescription(),
+                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+        }
 
-        issuedLeaderSessionID = null;
         confirmedLeaderInformation = LeaderInformation.empty();
-
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
         synchronized (lock) {
-            if (running) {
+            if (leaderContender != null) {
                 LOG.trace(
                         "Leader node changed while {} is the leader with 
session ID {}. New leader information {}.",

Review Comment:
   Maybe we can use `LeaderElectionUtils.convertToString(leaderInformation)` to 
replace `leaderInformation ` as the format param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to