This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 6b24bfe15dd [improve][broker] re-elect the channel owner if no channel
owner is found (#23516)
6b24bfe15dd is described below
commit 6b24bfe15ddb867231fa513c9496b9a6eae8770a
Author: Heesung Sohn <[email protected]>
AuthorDate: Mon Oct 28 10:56:58 2024 -0700
[improve][broker] re-elect the channel owner if no channel owner is found
(#23516)
(cherry picked from commit 266e705d29e16145ccb56a9031b4663a7ae085b7)
---
.../extensions/ExtensibleLoadManagerImpl.java | 60 +++++++++++++---
.../extensions/ExtensibleLoadManagerImplTest.java | 82 +++++++++++++++++++++-
2 files changed, 131 insertions(+), 11 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index abca2bb3982..ef29d7d9a74 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -826,6 +826,22 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
+ private boolean handleNoChannelOwnerError(Throwable e) {
+ if (FutureUtil.unwrapCompletionException(e).getMessage().contains("no
channel owner now")) {
+ var leaderElectionService = getLeaderElectionService();
+ log.warn("No channel owner is found. Trying to start
LeaderElectionService again.");
+ leaderElectionService.start();
+ var channelOwner =
serviceUnitStateChannel.getChannelOwnerAsync().join();
+ if (channelOwner.isEmpty()) {
+ log.error("Still no Leader is found even after
LeaderElectionService restarted.");
+ return false;
+ }
+ log.info("Successfully started LeaderElectionService. The new
channel owner is {}", channelOwner);
+ return true;
+ }
+ return false;
+ }
+
@VisibleForTesting
synchronized void playLeader() {
log.info("This broker:{} is setting the role from {} to {}",
@@ -837,10 +853,19 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
if (!initWaiter.get() || disabled()) {
return;
}
- if (!serviceUnitStateChannel.isChannelOwner()) {
- becameFollower = true;
- break;
+ try {
+ if (!serviceUnitStateChannel.isChannelOwner()) {
+ becameFollower = true;
+ break;
+ }
+ } catch (Throwable e) {
+ if (handleNoChannelOwnerError(e)) {
+ continue;
+ } else {
+ throw e;
+ }
}
+
if (disabled()) {
return;
}
@@ -903,10 +928,19 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
if (!initWaiter.get() || disabled()) {
return;
}
- if (serviceUnitStateChannel.isChannelOwner()) {
- becameLeader = true;
- break;
+ try {
+ if (serviceUnitStateChannel.isChannelOwner()) {
+ becameLeader = true;
+ break;
+ }
+ } catch (Throwable e) {
+ if (handleNoChannelOwnerError(e)) {
+ continue;
+ } else {
+ throw e;
+ }
}
+
if (disabled()) {
return;
}
@@ -995,7 +1029,17 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
// Monitor role
// Periodically check the role in case metadata store fails.
- var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
+
+ boolean isChannelOwner = false;
+ try {
+ isChannelOwner = serviceUnitStateChannel.isChannelOwner();
+ } catch (Throwable e) {
+ if (handleNoChannelOwnerError(e)) {
+ monitor();
+ } else {
+ throw e;
+ }
+ }
if (isChannelOwner) {
// System topic config might fail due to the race condition
// with topic policy init(Topic policies cache have not init).
@@ -1023,7 +1067,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
serviceUnitStateTableViewSyncer.close();
}
} catch (Throwable e) {
- log.error("Failed to get the channel ownership.", e);
+ log.error("Failed to monitor load manager state", e);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index d8d3e5bb44f..0909944f349 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -1491,16 +1491,16 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
log.info("makePrimaryAsLeader");
if (channel2.isChannelOwner()) {
pulsar2.getLeaderElectionService().close();
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(()
-> {
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
pulsar2.getLeaderElectionService().start();
}
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertTrue(channel1.isChannelOwner());
});
- Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
assertFalse(channel2.isChannelOwner());
});
}
@@ -1605,6 +1605,82 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
}
+
+ @DataProvider(name = "noChannelOwnerMonitorHandler")
+ public Object[][] noChannelOwnerMonitorHandler() {
+ return new Object[][] { { true }, { false } };
+ }
+
+ @Test(dataProvider = "noChannelOwnerMonitorHandler", timeOut = 30 * 1000,
priority = 2101)
+ public void testHandleNoChannelOwner(boolean noChannelOwnerMonitorHandler)
throws Exception {
+
+ makePrimaryAsLeader();
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playFollower();
+
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ primaryLoadManager.getRole());
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ secondaryLoadManager.getRole());
+
+ try {
+
+ // simulate no owner in the channel
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+ try {
+ pulsar1.getLeaderElectionService().close();
+
primaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
+ return false;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException &&
e.getMessage()
+ .contains("no channel owner now")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ });
+
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().until(() -> {
+ try {
+ pulsar2.getLeaderElectionService().close();
+
secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner();
+ return false;
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IllegalStateException &&
e.getMessage()
+ .contains("no channel owner now")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ });
+
+ // elect new channel owner by either monitor or
playLeader/playFollower
+ if (noChannelOwnerMonitorHandler) {
+ secondaryLoadManager.monitor();
+ primaryLoadManager.monitor();
+ } else {
+ secondaryLoadManager.playLeader();
+ primaryLoadManager.playFollower();
+ }
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ secondaryLoadManager.getRole());
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ primaryLoadManager.getRole());
+
+
assertTrue(secondaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
+
assertFalse(primaryLoadManager.getServiceUnitStateChannel().isChannelOwner());
+ });
+
+ } finally {
+ // clean up for monitor test
+ pulsar1.getLeaderElectionService().start();
+ pulsar2.getLeaderElectionService().start();
+ }
+ }
+
@Test(timeOut = 30 * 1000, priority = 2000)
public void testRoleChange() throws Exception {
makePrimaryAsLeader();