XComp commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1252040025


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
     }
 
     @GuardedBy("lock")
-    private void notifyLeaderContenderOfLeadershipLoss() {
+    private void notifyLeaderContenderOfLeadershipLoss(
+            String contenderID, LeaderContender leaderContender) {
         Preconditions.checkState(
                 leaderContender != null,
                 "The LeaderContender should be always set when calling this 
method.");
 
-        if (confirmedLeaderInformation.isEmpty()) {
+        if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
             LOG.debug(
                     "Revoking leadership to contender {} while a previous 
leadership grant wasn't confirmed, yet.",
-                    leaderContender.getDescription());
+                    contenderID);
         } else {
             LOG.debug(
                     "Revoking leadership to contender {} for {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+                    contenderID,
+                    LeaderElectionUtils.convertToString(
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
         }
 
-        confirmedLeaderInformation = LeaderInformation.empty();
+        confirmedLeaderInformation =
+                LeaderInformationRegister.clear(confirmedLeaderInformation, 
contenderID);
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
-        runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
+        leaderContenderRegistry
+                .keySet()
+                .forEach(
+                        contenderID ->
+                                notifyLeaderInformationChange(contenderID, 
leaderInformation));
     }
 
     @GuardedBy("lock")
-    private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
-        if (leaderContender != null) {
-            LOG.trace(
-                    "Leader node changed while {} is the leader with {}. New 
leader information {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
-                    LeaderElectionUtils.convertToString(leaderInformation));
-            if (!confirmedLeaderInformation.isEmpty()) {
-                final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
-                if (leaderInformation.isEmpty()) {
-                    LOG.debug(
-                            "Writing leader information by {} since the 
external storage is empty.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
-                    // the data field does not correspond to the expected 
leader information
-                    LOG.debug(
-                            "Correcting leader information by {}.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                }
-            }
-        } else {
-            LOG.debug(
-                    "Ignoring change notification since the {} has already 
been stopped.",
-                    leaderElectionDriver);
-        }
+    private void onLeaderInformationChangeInternal(LeaderInformationRegister 
leaderInformation) {
+        leaderContenderRegistry.forEach(
+                (contenderID, leaderContender) -> {
+                    final LeaderInformation newLeaderInformationForContender =
+                            leaderInformation
+                                    .forContenderID(contenderID)
+                                    .orElse(LeaderInformation.empty());
+                    final LeaderInformation 
confirmedLeaderInformationForContender =
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+                    if (confirmedLeaderInformationForContender.isEmpty()) {
+                        LOG.trace(
+                                "Leader information changed while there's no 
confirmation available by the contender for contender ID '{}' for this session. 
Changed leader information {} will be ignored.",
+                                contenderID,
+                                LeaderElectionUtils.convertToString(
+                                        newLeaderInformationForContender));

Review Comment:
   > On the flip-side, we don't know if the leader is ready yet to accept 
requests yet? (because for example the dispatcher is still starting) thinking
   Yes, that would only be the case after the `confirmLeadership` method call 
finally happened that would set the `confirmedLeaderInformation` within the 
service and in the HA backend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to