XComp commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1251536978
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
}
@GuardedBy("lock")
- private void notifyLeaderContenderOfLeadershipLoss() {
+ private void notifyLeaderContenderOfLeadershipLoss(
+ String contenderID, LeaderContender leaderContender) {
Preconditions.checkState(
leaderContender != null,
"The LeaderContender should be always set when calling this
method.");
- if (confirmedLeaderInformation.isEmpty()) {
+ if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
LOG.debug(
"Revoking leadership to contender {} while a previous
leadership grant wasn't confirmed, yet.",
- leaderContender.getDescription());
+ contenderID);
} else {
LOG.debug(
"Revoking leadership to contender {} for {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+ contenderID,
+ LeaderElectionUtils.convertToString(
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
}
- confirmedLeaderInformation = LeaderInformation.empty();
+ confirmedLeaderInformation =
+ LeaderInformationRegister.clear(confirmedLeaderInformation,
contenderID);
leaderContender.revokeLeadership();
}
@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation)
{
- runInLeaderEventThread(() ->
onLeaderInformationChangeInternal(leaderInformation));
+ leaderContenderRegistry
+ .keySet()
+ .forEach(
+ contenderID ->
+ notifyLeaderInformationChange(contenderID,
leaderInformation));
}
@GuardedBy("lock")
- private void onLeaderInformationChangeInternal(LeaderInformation
leaderInformation) {
- if (leaderContender != null) {
- LOG.trace(
- "Leader node changed while {} is the leader with {}. New
leader information {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
- LeaderElectionUtils.convertToString(leaderInformation));
- if (!confirmedLeaderInformation.isEmpty()) {
- final LeaderInformation confirmedLeaderInfo =
this.confirmedLeaderInformation;
- if (leaderInformation.isEmpty()) {
- LOG.debug(
- "Writing leader information by {} since the
external storage is empty.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
- // the data field does not correspond to the expected
leader information
- LOG.debug(
- "Correcting leader information by {}.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- }
- }
- } else {
- LOG.debug(
- "Ignoring change notification since the {} has already
been stopped.",
- leaderElectionDriver);
- }
+ private void onLeaderInformationChangeInternal(LeaderInformationRegister
leaderInformation) {
+ leaderContenderRegistry.forEach(
+ (contenderID, leaderContender) -> {
+ final LeaderInformation newLeaderInformationForContender =
+ leaderInformation
+ .forContenderID(contenderID)
+ .orElse(LeaderInformation.empty());
+ final LeaderInformation
confirmedLeaderInformationForContender =
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+ if (confirmedLeaderInformationForContender.isEmpty()) {
+ LOG.trace(
+ "Leader information changed while there's no
confirmation available by the contender for contender ID '{}' for this session.
Changed leader information {} will be ignored.",
+ contenderID,
+ LeaderElectionUtils.convertToString(
+ newLeaderInformationForContender));
Review Comment:
The interface methods `onLeaderInformationChange` (for the
`LeaderElectionEventHandler`) and
`notifyLeaderInformationChange`/`notifyAllKnownLeaderInformation` (for the
`MultipleComponentLeaderElectionDriver.Listener`) are only called if the
current service has leadership acquired and informs the service that some other
process updated the connection info (w/o having the right to do so). In that
case, the leader has to sent its confirmed information to the HA backend to
overwrite the data again. That's what's happening here.
I understand the code in a way for your case, that there is no
confirmedInformation, yet, but the leadership was granted, i.e. there is a
`confirmLeadership` call on the way which, as a consequence, will write the
confirmed leadership to the HA backend. I guess, that this is why there was no
data written to the HA backend in that case (even in the past).
But thinking about it: We might want to write the empty LeaderInformation to
the HA backend in that case to avoid that monitoring services pick up the
changed (false) LeaderInformation from the HA backend while the
`confirmLeadership` call is delayed. :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]