zentol commented on code in PR #22844:
URL: https://github.com/apache/flink/pull/22844#discussion_r1250941228


##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -189,66 +179,62 @@ protected void register(String contenderID, 
LeaderContender contender) throws Ex
         checkNotNull(contender, "Contender must not be null.");
 
         synchronized (lock) {
-            Preconditions.checkState(
-                    leaderContender == null,
-                    "Only one LeaderContender is allowed to be registered to 
this service.");
-            Preconditions.checkState(
-                    this.contenderID == null, "The contenderID is only allowed 
to be set once.");
             Preconditions.checkState(
                     running,
                     "The DefaultLeaderElectionService should have established 
a connection to the backend before it's started.");
 
-            leaderContender = contender;
-            this.contenderID = contenderID;
+            leaderContenderRegistry.put(contenderID, contender);
 
             LOG.info(
                     "LeaderContender {} has been registered for {}.",
-                    contender.getDescription(),
+                    contenderID,
                     leaderElectionDriver);
 
             if (issuedLeaderSessionID != null) {
                 // notifying the LeaderContender shouldn't happen in the 
contender's main thread
                 runInLeaderEventThread(
-                        () -> 
notifyLeaderContenderOfLeadership(issuedLeaderSessionID));
+                        () ->
+                                notifyLeaderContenderOfLeadership(
+                                        contenderID, issuedLeaderSessionID));
             }
         }
     }
 
     @Override
     protected final void remove(String contenderID) {
         synchronized (lock) {
-            if (this.contenderID == null) {
+            if (!leaderContenderRegistry.containsKey(contenderID)) {
                 LOG.debug(
                         "The stop procedure was called on an already stopped 
DefaultLeaderElectionService instance. No action necessary.");

Review Comment:
   update message to include the contender and clarify the the LES may still be 
running?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -189,66 +179,62 @@ protected void register(String contenderID, 
LeaderContender contender) throws Ex
         checkNotNull(contender, "Contender must not be null.");
 
         synchronized (lock) {
-            Preconditions.checkState(
-                    leaderContender == null,
-                    "Only one LeaderContender is allowed to be registered to 
this service.");
-            Preconditions.checkState(
-                    this.contenderID == null, "The contenderID is only allowed 
to be set once.");
             Preconditions.checkState(
                     running,
                     "The DefaultLeaderElectionService should have established 
a connection to the backend before it's started.");
 
-            leaderContender = contender;
-            this.contenderID = contenderID;
+            leaderContenderRegistry.put(contenderID, contender);

Review Comment:
   check that the previous value was null?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -932,38 +1064,71 @@ void runTestWithManuallyTriggeredEvents(
 
         void runTest(RunnableWithException testMethod, ExecutorService 
leaderEventOperationExecutor)
                 throws Exception {
-            try {
-                leaderElectionService =
-                        new DefaultLeaderElectionService(
-                                driverFactory,
-                                
DefaultLeaderElectionServiceTest.this.fatalErrorHandlerExtension
-                                        .getTestingFatalErrorHandler(),
-                                leaderEventOperationExecutor);
+            try (final DefaultLeaderElectionService localLeaderElectionService 
=
+                    new DefaultLeaderElectionService(
+                            driverFactory,
+                            
DefaultLeaderElectionServiceTest.this.fatalErrorHandlerExtension
+                                    .getTestingFatalErrorHandler(),
+                            leaderEventOperationExecutor)) {
+                leaderElectionService = localLeaderElectionService;
                 leaderElectionService.startLeaderElectionBackend();
                 testingLeaderElectionDriver = 
driverFactory.assertAndGetOnlyCreatedDriver();
 
-                leaderElection = 
leaderElectionService.createLeaderElection(contenderID);
-                testingContender = new TestingContender(TEST_URL, 
leaderElection);
-                testingContender.startLeaderElection();
+                try (final ContenderContext localContenderContext0 =
+                                ContenderContext.create(0, 
leaderElectionService);
+                        final ContenderContext localContenderContext1 =
+                                ContenderContext.create(1, 
leaderElectionService)) {
+                    this.contenderContext0 = localContenderContext0;
+                    this.contenderContext1 = localContenderContext1;

Review Comment:
   Why are we creating 2 contenders now? Purely to have all tests also apply to 
cases with multiple contenders?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderInformationRegisterTest.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class LeaderInformationRegisterTest {
+
+    @Test
+    void testOfWithKnownLeaderInformation() {
+        final String contenderID = "contender-id";
+        final LeaderInformation leaderInformation =
+                LeaderInformation.known(UUID.randomUUID(), "address");
+
+        final LeaderInformationRegister testInstance =
+                LeaderInformationRegister.of(contenderID, leaderInformation);
+        
assertThat(testInstance.getRegisteredContenderIDs()).containsExactly(contenderID);
+        
assertThat(testInstance.forContenderID(contenderID)).hasValue(leaderInformation);
+    }
+
+    @Test
+    void testOfWithEmptyLeaderInformation() {
+        final String contenderID = "contender-id";
+        final LeaderInformationRegister testInstance =
+                LeaderInformationRegister.of(contenderID, 
LeaderInformation.empty());
+
+        assertThat(testInstance.getRegisteredContenderIDs()).isEmpty();
+        assertThat(testInstance.forContenderID(contenderID)).isNotPresent();
+    }
+
+    @Test
+    void testMerge() {
+        final String contenderID = "contender-id";
+        final LeaderInformation leaderInformation =
+                LeaderInformation.known(UUID.randomUUID(), "address");
+
+        final String newContenderID = "new-contender-id";
+        final LeaderInformation newLeaderInformation =
+                LeaderInformation.known(UUID.randomUUID(), "new-address");
+
+        final LeaderInformationRegister initialRegister =
+                LeaderInformationRegister.of(contenderID, leaderInformation);
+
+        final LeaderInformationRegister newRegister =
+                LeaderInformationRegister.merge(
+                        initialRegister, newContenderID, newLeaderInformation);
+
+        assertThat(newRegister).isNotSameAs(initialRegister);
+        assertThat(newRegister.getRegisteredContenderIDs())
+                .containsExactlyInAnyOrder(contenderID, newContenderID);
+        
assertThat(newRegister.forContenderID(contenderID)).hasValue(leaderInformation);
+        
assertThat(newRegister.forContenderID(newContenderID)).hasValue(newLeaderInformation);
+    }
+
+    @Test
+    void testMergeEmptyLeaderInformation() {
+        final String contenderID = "contender-id";
+        final LeaderInformation leaderInformation =
+                LeaderInformation.known(UUID.randomUUID(), "address");
+
+        final String newContenderID = "new-contender-id";
+
+        final LeaderInformationRegister initialRegister =
+                LeaderInformationRegister.of(contenderID, leaderInformation);
+
+        final LeaderInformationRegister newRegister =
+                LeaderInformationRegister.merge(
+                        initialRegister, newContenderID, 
LeaderInformation.empty());
+
+        assertThat(newRegister).isNotSameAs(initialRegister);
+        
assertThat(newRegister.getRegisteredContenderIDs()).containsExactly(contenderID);
+        assertThat(newRegister.forContenderID(newContenderID)).isNotPresent();
+    }
+
+    @Test
+    void testMergeEmptyLeaderInformationForExistingContenderID() {

Review Comment:
   for completeness a test where we overwrite existing leader information with 
new (non-empty) leader information would be nice.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -503,43 +516,53 @@ private void runInLeaderEventThread(Runnable callback) {
 
     private void forwardErrorToLeaderContender(Throwable t) {
         synchronized (lock) {
-            if (leaderContender == null) {
+            if (leaderContenderRegistry.isEmpty()) {
                 fallbackErrorHandler.onFatalError(t);
                 return;
             }
 
-            if (t instanceof LeaderElectionException) {
-                leaderContender.handleError((LeaderElectionException) t);
-            } else {
-                leaderContender.handleError(new LeaderElectionException(t));
-            }
+            leaderContenderRegistry
+                    .values()
+                    .forEach(
+                            leaderContender -> {
+                                if (leaderContender == null) {

Review Comment:
   :thinking: When can the value of the registry map be null? Even if that were 
possible (which I dont think it is), then we'd logging this message once per 
contender.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -349,22 +380,42 @@ void 
testProperCleanupOnLeaderElectionCloseWhenHoldingTheLeadership() throws Exc
                             final UUID leaderSessionID = UUID.randomUUID();
                             grantLeadership(leaderSessionID);
 
-                            
assertThat(testingContender.getLeaderSessionID()).isNotNull();
-                            
assertThat(leaderElectionService.getLeaderSessionID(contenderID))
-                                    .isEqualTo(leaderSessionID);
-                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
-                                    
.hasValue(LeaderInformation.known(leaderSessionID, TEST_URL));
+                            applyToBothContenderContexts(
+                                    ctx -> {
+                                        
assertThat(ctx.contender.getLeaderSessionID())
+                                                .isEqualTo(leaderSessionID);
+                                        assertThat(
+                                                        
leaderElectionService.getLeaderSessionID(
+                                                                
ctx.contenderID))
+                                                .isEqualTo(leaderSessionID);
+
+                                        assertThat(
+                                                        
leaderElectionService.getLeaderSessionID(
+                                                                
ctx.contenderID))
+                                                .isEqualTo(leaderSessionID);
+
+                                        assertThat(
+                                                        storedLeaderInformation
+                                                                .get()
+                                                                
.forContenderID(ctx.contenderID))
+                                                .hasValue(
+                                                        
LeaderInformation.known(
+                                                                
leaderSessionID, ctx.address));

Review Comment:
   look at that magnificent formatting magic, going from 5 lines 18.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -425,59 +425,72 @@ private void onRevokeLeadershipInternal() {
     }
 
     @GuardedBy("lock")
-    private void notifyLeaderContenderOfLeadershipLoss() {
+    private void notifyLeaderContenderOfLeadershipLoss(
+            String contenderID, LeaderContender leaderContender) {
         Preconditions.checkState(
                 leaderContender != null,
                 "The LeaderContender should be always set when calling this 
method.");
 
-        if (confirmedLeaderInformation.isEmpty()) {
+        if (!confirmedLeaderInformation.hasLeaderInformation(contenderID)) {
             LOG.debug(
                     "Revoking leadership to contender {} while a previous 
leadership grant wasn't confirmed, yet.",
-                    leaderContender.getDescription());
+                    contenderID);
         } else {
             LOG.debug(
                     "Revoking leadership to contender {} for {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation));
+                    contenderID,
+                    LeaderElectionUtils.convertToString(
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID)));
         }
 
-        confirmedLeaderInformation = LeaderInformation.empty();
+        confirmedLeaderInformation =
+                LeaderInformationRegister.clear(confirmedLeaderInformation, 
contenderID);
         leaderContender.revokeLeadership();
     }
 
     @Override
     public void onLeaderInformationChange(LeaderInformation leaderInformation) 
{
-        runInLeaderEventThread(() -> 
onLeaderInformationChangeInternal(leaderInformation));
+        leaderContenderRegistry
+                .keySet()
+                .forEach(
+                        contenderID ->
+                                notifyLeaderInformationChange(contenderID, 
leaderInformation));
     }
 
     @GuardedBy("lock")
-    private void onLeaderInformationChangeInternal(LeaderInformation 
leaderInformation) {
-        if (leaderContender != null) {
-            LOG.trace(
-                    "Leader node changed while {} is the leader with {}. New 
leader information {}.",
-                    leaderContender.getDescription(),
-                    
LeaderElectionUtils.convertToString(confirmedLeaderInformation),
-                    LeaderElectionUtils.convertToString(leaderInformation));
-            if (!confirmedLeaderInformation.isEmpty()) {
-                final LeaderInformation confirmedLeaderInfo = 
this.confirmedLeaderInformation;
-                if (leaderInformation.isEmpty()) {
-                    LOG.debug(
-                            "Writing leader information by {} since the 
external storage is empty.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                } else if (!leaderInformation.equals(confirmedLeaderInfo)) {
-                    // the data field does not correspond to the expected 
leader information
-                    LOG.debug(
-                            "Correcting leader information by {}.",
-                            leaderContender.getDescription());
-                    leaderElectionDriver.publishLeaderInformation(contenderID, 
confirmedLeaderInfo);
-                }
-            }
-        } else {
-            LOG.debug(
-                    "Ignoring change notification since the {} has already 
been stopped.",
-                    leaderElectionDriver);
-        }
+    private void onLeaderInformationChangeInternal(LeaderInformationRegister 
leaderInformation) {
+        leaderContenderRegistry.forEach(
+                (contenderID, leaderContender) -> {
+                    final LeaderInformation newLeaderInformationForContender =
+                            leaderInformation
+                                    .forContenderID(contenderID)
+                                    .orElse(LeaderInformation.empty());
+                    final LeaderInformation 
confirmedLeaderInformationForContender =
+                            
confirmedLeaderInformation.forContenderIDOrEmpty(contenderID);
+
+                    if (confirmedLeaderInformationForContender.isEmpty()) {
+                        LOG.trace(
+                                "Leader information changed while there's no 
confirmation available by the contender for contender ID '{}' for this session. 
Changed leader information {} will be ignored.",
+                                contenderID,
+                                LeaderElectionUtils.convertToString(
+                                        newLeaderInformationForContender));

Review Comment:
   I have trouble understanding this situation. A contender got leadership, but 
hasn't confirmed it yet? Shouldn't we then put the leadership loss on the wire 
anyway? When is the contender informed about the leadership loss otherwise?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java:
##########
@@ -349,15 +342,17 @@ protected boolean hasLeadership(String contenderID, UUID 
leaderSessionId) {
     @Nullable
     public UUID getLeaderSessionID(String contenderID) {
         synchronized (lock) {
-            return contenderID.equals(this.contenderID)
-                    ? confirmedLeaderInformation.getLeaderSessionID()
+            return leaderContenderRegistry.containsKey(contenderID)
+                    ? confirmedLeaderInformation
+                            .forContenderIDOrEmpty(contenderID)

Review Comment:
   This method names looks weird; maybe rename to `forContenderIdOrEmpty`?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java:
##########
@@ -597,14 +676,30 @@ void 
testOnLeaderInformationChangeIsIgnoredAfterLeaderElectionBeingStop() throws
                         () -> {
                             grantLeadership();
 
-                            
assertThat(storedLeaderInformation.get().forContenderID(contenderID))
-                                    .isPresent();
+                            applyToBothContenderContexts(
+                                    ctx ->
+                                            assertThat(
+                                                            
storedLeaderInformation
+                                                                    .get()
+                                                                    
.forContenderID(
+                                                                            
ctx.contenderID))
+                                                    .isPresent());
+
+                            contenderContext0.leaderElection.close();

Review Comment:
   why isn't this done within `applyToBothContenderContexts`? it runs the same 
logic for both contenders, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to