XComp commented on code in PR #22380:
URL: https://github.com/apache/flink/pull/22380#discussion_r1169832741


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -145,96 +222,122 @@ public void confirmLeadership(UUID leaderSessionID, 
String leaderAddress) {
                 } else {
                     LOG.warn(
                             "The leader session ID {} was confirmed even 
though the "
-                                    + "corresponding JobManager was not 
elected as the leader.",
+                                    + "corresponding service was not elected 
as the leader or has been stopped already.",
                             leaderSessionID);
                 }
             }
         }
     }
 
+    @GuardedBy("lock")
+    private boolean hasLeadership() {
+        return leaderElectionDriver.hasLeadership() && issuedLeaderSessionID 
!= null;
+    }
+
     @Override
     public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
         synchronized (lock) {
-            if (running) {
-                return leaderElectionDriver.hasLeadership()
-                        && leaderSessionId.equals(issuedLeaderSessionID);
+            if (leaderElectionDriver != null) {
+                if (leaderContender != null) {
+                    return hasLeadership() && 
leaderSessionId.equals(issuedLeaderSessionID);
+                } else {
+                    LOG.debug(
+                            "hasLeadership is called after the LeaderContender 
was removed, returning false.");
+                    return false;
+                }
             } else {
-                LOG.debug("hasLeadership is called after the service is 
stopped, returning false.");
+                LOG.debug("hasLeadership is called after the service is 
closed, returning false.");
                 return false;
             }
         }
     }
 
-    /**
-     * Returns the current leader session ID or null, if the contender is not 
the leader.
-     *
-     * @return The last leader session ID or null, if the contender is not the 
leader
-     */
+    /** Returns the current leader session ID or {@code null}, if the session 
wasn't confirmed. */
     @VisibleForTesting
     @Nullable
     public UUID getLeaderSessionID() {
-        return confirmedLeaderInformation.getLeaderSessionID();
-    }
-
-    @GuardedBy("lock")
-    private void confirmLeaderInformation(UUID leaderSessionID, String 
leaderAddress) {
-        confirmedLeaderInformation = LeaderInformation.known(leaderSessionID, 
leaderAddress);
-        
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
+        synchronized (lock) {
+            return confirmedLeaderInformation.getLeaderSessionID();
+        }
     }
 
     @Override
     public void onGrantLeadership(UUID newLeaderSessionId) {
+        Preconditions.checkNotNull(newLeaderSessionId);
+
         synchronized (lock) {
-            if (running) {
-                issuedLeaderSessionID = newLeaderSessionId;
-                confirmedLeaderInformation = LeaderInformation.empty();
+            Preconditions.checkState(
+                    issuedLeaderSessionID == null,
+                    "The leadership should have been granted while not having 
the leadership acquired.");
 
-                LOG.debug(
-                        "Grant leadership to contender {} with session ID {}.",
-                        leaderContender.getDescription(),
-                        issuedLeaderSessionID);
+            issuedLeaderSessionID = newLeaderSessionId;
 
-                leaderContender.grantLeadership(issuedLeaderSessionID);
+            if (leaderContender != null) {
+                notifyLeaderContenderOfLeadership();
             } else {
                 LOG.debug(
-                        "Ignoring the grant leadership notification since the 
{} has already been closed.",
+                        "The grant leadership notification is not forwarded 
because the DefaultLeaderElectionService ({}) has no contender registered.",
                         leaderElectionDriver);
             }
         }
     }
 
+    @GuardedBy("lock")
+    private void notifyLeaderContenderOfLeadership() {
+        Preconditions.checkState(
+                confirmedLeaderInformation.isEmpty(),
+                "The leadership should have been granted while not having the 
leadership acquired.");
+
+        LOG.debug(
+                "Granting leadership to contender {} with session ID {}.",
+                leaderContender.getDescription(),
+                issuedLeaderSessionID);
+
+        leaderContender.grantLeadership(issuedLeaderSessionID);
+    }
+
     @Override
     public void onRevokeLeadership() {
         synchronized (lock) {
-            if (running) {
-                handleLeadershipLoss();
+            // TODO: FLINK-31814 covers adding this Precondition
+            // Preconditions.checkState(issuedLeaderSessionID != null,"The 
leadership should have
+            // been revoked while having the leadership acquired.");
+
+            final UUID previousSessionID = issuedLeaderSessionID;
+            issuedLeaderSessionID = null;
+
+            if (leaderContender != null) {

Review Comment:
   No, it's possible that `onRevokeLeadership` is called without a contender 
being registered because the two `on*Leadership` methods are bound to the 
`leaderElectionDriver`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to