XComp commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1251550146
##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
}
@GuardedBy("lock")
- private void notifyLeaderContenderOfLeadershipLoss() {
+ private void notifyLeaderContenderOfLeadershipLoss(
+ String contenderID, LeaderContender leaderContender) {
Preconditions.checkState(
leaderContender != null,
"The LeaderContender should be always set when calling this
method.");
- if (confirmedLeaderInformation.isEmpty()) {
+ if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
LOG.debug(
"Revoking leadership to contender {} while a previous
leadership grant wasn't confirmed, yet.",
- leaderContender.getDescription());
+ contenderID);
} else {
LOG.debug(
"Revoking leadership to contender {} for {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+ contenderID,
+ LeaderElectionUtils.convertToString(
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
}
- confirmedLeaderInformation = LeaderInformation.empty();
+ confirmedLeaderInformation =
+ LeaderInformationRegister.clear(confirmedLeaderInformation,
contenderID);
leaderContender.revokeLeadership();
}
@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation)
{
- runInLeaderEventThread(() ->
onLeaderInformationChangeInternal(leaderInformation));
+ leaderContenderRegistry
+ .keySet()
+ .forEach(
+ contenderID ->
+ notifyLeaderInformationChange(contenderID,
leaderInformation));
}
@GuardedBy("lock")
- private void onLeaderInformationChangeInternal(LeaderInformation
leaderInformation) {
- if (leaderContender != null) {
- LOG.trace(
- "Leader node changed while {} is the leader with {}. New
leader information {}.",
- leaderContender.getDescription(),
-
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
- LeaderElectionUtils.convertToString(leaderInformation));
- if (!confirmedLeaderInformation.isEmpty()) {
- final LeaderInformation confirmedLeaderInfo =
this.confirmedLeaderInformation;
- if (leaderInformation.isEmpty()) {
- LOG.debug(
- "Writing leader information by {} since the
external storage is empty.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
- // the data field does not correspond to the expected
leader information
- LOG.debug(
- "Correcting leader information by {}.",
- leaderContender.getDescription());
- leaderElectionDriver.publishLeaderInformation(contenderID,
confirmedLeaderInfo);
- }
- }
- } else {
- LOG.debug(
- "Ignoring change notification since the {} has already
been stopped.",
- leaderElectionDriver);
- }
+ private void onLeaderInformationChangeInternal(LeaderInformationRegister
leaderInformation) {
+ leaderContenderRegistry.forEach(
+ (contenderID, leaderContender) -> {
+ final LeaderInformation newLeaderInformationForContender =
+ leaderInformation
+ .forContenderID(contenderID)
+ .orElse(LeaderInformation.empty());
+ final LeaderInformation
confirmedLeaderInformationForContender =
+
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+ if (confirmedLeaderInformationForContender.isEmpty()) {
+ LOG.trace(
+ "Leader information changed while there's no
confirmation available by the contender for contender ID '{}' for this session.
Changed leader information {} will be ignored.",
+ contenderID,
+ LeaderElectionUtils.convertToString(
+ newLeaderInformationForContender));
Review Comment:
I also noticed that the `notifyLeaderInformationChange` method is actually
implemented in a wrong way: The `LeaderInformationRegister` is used in the
wrong way. I have to get back to that one. :thinking:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]