XComp commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1251536978


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
     }
 
     @GuardedBy("lock")
-    private void notifyLeaderContenderOfLeadershipLoss() {
+    private void notifyLeaderContenderOfLeadershipLoss(
+            String contenderID, LeaderContender leaderContender) {
         Preconditions.checkState(
                 leaderContender != null,
                 "The LeaderContender should be always set when calling this 
method.");
 
-        if (confirmedLeaderInformation.isEmpty()) {
+        if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
             LOG.debug(
                     "Revoking leadership to contender {} while a previous 
leadership grant wasn't confirmed, yet.",
-                    leaderContender.getDescription());
+                    contenderID);
         } else {
             LOG.debug(
                     "Revoking leadership to contender {} for {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+                    contenderID,
+                    LeaderElectionUtils.convertToString(
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
         }
 
-        confirmedLeaderInformation = LeaderInformation.empty();
+        confirmedLeaderInformation =
+                LeaderInformationRegister.clear(confirmedLeaderInformation, 
contenderID);
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
-        runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
+        leaderContenderRegistry
+                .keySet()
+                .forEach(
+                        contenderID ->
+                                notifyLeaderInformationChange(contenderID, 
leaderInformation));
     }
 
     @GuardedBy("lock")
-    private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
-        if (leaderContender != null) {
-            LOG.trace(
-                    "Leader node changed while {} is the leader with {}. New 
leader information {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
-                    LeaderElectionUtils.convertToString(leaderInformation));
-            if (!confirmedLeaderInformation.isEmpty()) {
-                final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
-                if (leaderInformation.isEmpty()) {
-                    LOG.debug(
-                            "Writing leader information by {} since the 
external storage is empty.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
-                    // the data field does not correspond to the expected 
leader information
-                    LOG.debug(
-                            "Correcting leader information by {}.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                }
-            }
-        } else {
-            LOG.debug(
-                    "Ignoring change notification since the {} has already 
been stopped.",
-                    leaderElectionDriver);
-        }
+    private void onLeaderInformationChangeInternal(LeaderInformationRegister 
leaderInformation) {
+        leaderContenderRegistry.forEach(
+                (contenderID, leaderContender) -> {
+                    final LeaderInformation newLeaderInformationForContender =
+                            leaderInformation
+                                    .forContenderID(contenderID)
+                                    .orElse(LeaderInformation.empty());
+                    final LeaderInformation 
confirmedLeaderInformationForContender =
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+                    if (confirmedLeaderInformationForContender.isEmpty()) {
+                        LOG.trace(
+                                "Leader information changed while there's no 
confirmation available by the contender for contender ID '{}' for this session. 
Changed leader information {} will be ignored.",
+                                contenderID,
+                                LeaderElectionUtils.convertToString(
+                                        newLeaderInformationForContender));

Review Comment:
   The interface methods `onLeaderInformationChange` (for the 
`LeaderElectionEventHandler`) and 
`notifyLeaderInformationChange`/`notifyAllKnownLeaderInformation` (for the 
`MultipleComponentLeaderElectionDriver.Listener`) are only called if the 
current service has leadership acquired and informs the service that some other 
process updated the connection info (w/o having the right to do so). In that 
case, the leader has to sent its confirmed information to the HA backend to 
overwrite the data again. That's what's happening here.
   
   I understand the code in a way for your case, that there is no 
confirmedInformation, yet, but the leadership was granted, i.e. there is a 
`confirmLeadership` call on the way which, as a consequence, will write the 
confirmed leadership to the HA backend. I guess, that this is why there was no 
data written to the HA backend in that case (even in the past).
   
   But thinking about it: We might want to write the empty LeaderInformation to 
the HA backend in that case to avoid that monitoring services pick up the 
changed (false) LeaderInformation from the HA backend while the 
`confirmLeadership` call is delayed. :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to