This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 6b24bfe15dd [improve][broker] re-elect the channel owner if no channel 
owner is found (#23516)
6b24bfe15dd is described below

commit 6b24bfe15ddb867231fa513c9496b9a6eae8770a
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Oct 28 10:56:58 2024 -0700

    [improve][broker] re-elect the channel owner if no channel owner is found 
(#23516)
    
    (cherry picked from commit 266e705d29e16145ccb56a9031b4663a7ae085b7)
---
 .../extensions/ExtensibleLoadManagerImpl.java      | 60 +++++++++++++---
 .../extensions/ExtensibleLoadManagerImplTest.java  | 82 +++++++++++++++++++++-
 2 files changed, 131 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index abca2bb3982..ef29d7d9a74 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -826,6 +826,22 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 || topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
     }
 
+    private boolean handleNoChannelOwnerError(Throwable e) {
+        if (FutureUtil.unwrapCompletionException(e).getMessage().contains("no 
channel owner now")) {
+            var leaderElectionService = getLeaderElectionService();
+            log.warn("No channel owner is found. Trying to start 
LeaderElectionService again.");
+            leaderElectionService.start();
+            var channelOwner = 
serviceUnitStateChannel.getChannelOwnerAsync().join();
+            if (channelOwner.isEmpty()) {
+                log.error("Still no Leader is found even after 
LeaderElectionService restarted.");
+                return false;
+            }
+            log.info("Successfully started LeaderElectionService. The new 
channel owner is {}", channelOwner);
+            return true;
+        }
+        return false;
+    }
+
     @VisibleForTesting
     synchronized void playLeader() {
         log.info("This broker:{} is setting the role from {} to {}",
@@ -837,10 +853,19 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 if (!initWaiter.get() || disabled()) {
                     return;
                 }
-                if (!serviceUnitStateChannel.isChannelOwner()) {
-                    becameFollower = true;
-                    break;
+                try {
+                    if (!serviceUnitStateChannel.isChannelOwner()) {
+                        becameFollower = true;
+                        break;
+                    }
+                } catch (Throwable e) {
+                    if (handleNoChannelOwnerError(e)) {
+                        continue;
+                    } else {
+                        throw e;
+                    }
                 }
+
                 if (disabled()) {
                     return;
                 }
@@ -903,10 +928,19 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 if (!initWaiter.get() || disabled()) {
                     return;
                 }
-                if (serviceUnitStateChannel.isChannelOwner()) {
-                    becameLeader = true;
-                    break;
+                try {
+                    if (serviceUnitStateChannel.isChannelOwner()) {
+                        becameLeader = true;
+                        break;
+                    }
+                } catch (Throwable e) {
+                    if (handleNoChannelOwnerError(e)) {
+                        continue;
+                    } else {
+                        throw e;
+                    }
                 }
+
                 if (disabled()) {
                     return;
                 }
@@ -995,7 +1029,17 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
 
             // Monitor role
             // Periodically check the role in case metadata store fails.
-            var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
+
+            boolean isChannelOwner = false;
+            try {
+                isChannelOwner = serviceUnitStateChannel.isChannelOwner();
+            } catch (Throwable e) {
+                if (handleNoChannelOwnerError(e)) {
+                    monitor();
+                } else {
+                    throw e;
+                }
+            }
             if (isChannelOwner) {
                 // System topic config might fail due to the race condition
                 // with topic policy init(Topic policies cache have not init).
@@ -1023,7 +1067,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager, BrokerS
                 serviceUnitStateTableViewSyncer.close();
             }
         } catch (Throwable e) {
-            log.error("Failed to get the channel ownership.", e);
+            log.error("Failed to monitor load manager state", e);
         }
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d8d3e5bb44f..0909944f349 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1491,16 +1491,16 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
         log.info("makePrimaryAsLeader");
         if (channel2.isChannelOwner()) {
             pulsar2.getLeaderElectionService().close();
-            Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() 
-> {
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
                 assertTrue(channel1.isChannelOwner());
             });
             pulsar2.getLeaderElectionService().start();
         }
 
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
             assertTrue(channel1.isChannelOwner());
         });
-        Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
             assertFalse(channel2.isChannelOwner());
         });
     }
@@ -1605,6 +1605,82 @@ public class ExtensibleLoadManagerImplTest extends 
ExtensibleLoadManagerImplBase
 
 
     }
+
+    @DataProvider(name = "noChannelOwnerMonitorHandler")
+    public Object[][] noChannelOwnerMonitorHandler() {
+        return new Object[][] { { true }, { false } };
+    }
+
+    @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000, 
priority = 2101)
+    public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler) 
throws Exception {
+
+        makePrimaryAsLeader();
+        primaryLoadManager.playLeader();
+        secondaryLoadManager.playFollower();
+
+        assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                primaryLoadManager.getRole());
+        assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                secondaryLoadManager.getRole());
+
+        try {
+
+            // simulate no owner in the channel
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+                try {
+                    pulsar1.getLeaderElectionService().close();
+                    
primaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
+                    return false;
+                } catch (ExecutionException e) {
+                    if (e.getCause() instanceof IllegalStateException && 
e.getMessage()
+                            .contains("no channel owner now")) {
+                        return true;
+                    } else {
+                        return false;
+                    }
+                }
+            });
+
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+                try {
+                    pulsar2.getLeaderElectionService().close();
+                    
secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
+                    return false;
+                } catch (ExecutionException e) {
+                    if (e.getCause() instanceof IllegalStateException && 
e.getMessage()
+                            .contains("no channel owner now")) {
+                        return true;
+                    } else {
+                        return false;
+                    }
+                }
+            });
+
+            // elect new channel owner by either monitor or 
playLeader/playFollower
+            if (noChannelOwnerMonitorHandler) {
+                secondaryLoadManager.monitor();
+                primaryLoadManager.monitor();
+            } else {
+                secondaryLoadManager.playLeader();
+                primaryLoadManager.playFollower();
+            }
+            Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+                assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+                        secondaryLoadManager.getRole());
+                assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+                        primaryLoadManager.getRole());
+
+                
assertTrue(secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
+                
assertFalse(primaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
+            });
+
+        } finally {
+            // clean up for monitor test
+            pulsar1.getLeaderElectionService().start();
+            pulsar2.getLeaderElectionService().start();
+        }
+    }
+
     @Test(timeOut = 30 * 1000, priority = 2000)
     public void testRoleChange() throws Exception {
         makePrimaryAsLeader();

Reply via email to