This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1b43b9de72c [improve][broker] Explicitly close LB internal topics when
playing a follower (ExtensibleLoadManagerImpl only) (#23144)
1b43b9de72c is described below
commit 1b43b9de72ccf57b56375870b43d551def59d8a3
Author: Heesung Sohn <[email protected]>
AuthorDate: Sat Aug 10 02:24:26 2024 -0700
[improve][broker] Explicitly close LB internal topics when playing a
follower (ExtensibleLoadManagerImpl only) (#23144)
---
.../extensions/ExtensibleLoadManagerImpl.java | 20 +-
.../extensions/ExtensibleLoadManagerImplTest.java | 235 +++++++++++++--------
2 files changed, 162 insertions(+), 93 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 9450c2a9cc4..95882cfb21b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -124,7 +124,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private static final String ELECTION_ROOT =
"/loadbalance/extension/leader";
- private static final Set<String> INTERNAL_TOPICS =
+ public static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC,
TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);
@VisibleForTesting
@@ -146,7 +146,10 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
@Getter
private IsolationPoliciesHelper isolationPoliciesHelper;
+ @Getter
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
+
+ @Getter
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
private LoadManagerScheduler unloadScheduler;
@@ -259,6 +262,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
Follower
}
+ @Getter
private volatile Role role;
/**
@@ -903,6 +907,7 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
}
unloadScheduler.close();
serviceUnitStateChannel.cancelOwnershipMonitor();
+ closeInternalTopics();
brokerLoadDataStore.init();
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
@@ -1006,12 +1011,13 @@ public class ExtensibleLoadManagerImpl implements
ExtensibleLoadManager, BrokerS
private void closeInternalTopics() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String name : INTERNAL_TOPICS) {
- futures.add(pulsar.getBrokerService().getTopicIfExists(name)
- .thenAccept(topicOptional -> topicOptional.ifPresent(topic
-> topic.close(true)))
- .exceptionally(__ -> {
- log.warn("Failed to close internal topic:{}", name);
- return null;
- }));
+ pulsar.getBrokerService()
+ .getTopicReference(name)
+ .ifPresent(topic -> futures.add(topic.close(true)
+ .exceptionally(__ -> {
+ log.warn("Failed to close internal topic:{}",
name);
+ return null;
+ })));
}
try {
FutureUtil.waitForAll(futures)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index 4a9b80c798f..69a65caf294 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -102,7 +102,6 @@ import
org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision;
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadCounter;
import
org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
import
org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
-import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.LookupOptions;
@@ -122,7 +121,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.LookupService;
-import org.apache.pulsar.client.impl.TableViewImpl;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
@@ -134,6 +132,7 @@ import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
@@ -1186,55 +1185,49 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
assertEquals(result, expectedBrokerServiceUrl);
}
- @Test(priority = 10)
- public void testTopBundlesLoadDataStoreTableViewFromChannelOwner() throws
Exception {
- var topBundlesLoadDataStorePrimary =
- (LoadDataStore)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore",
true);
- var serviceUnitStateChannelPrimary =
- (ServiceUnitStateChannelImpl)
FieldUtils.readDeclaredField(primaryLoadManager,
- "serviceUnitStateChannel", true);
- var tvPrimary =
- (TableViewImpl)
FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimary, "tableView", true);
-
- var topBundlesLoadDataStoreSecondary =
- (LoadDataStore)
FieldUtils.readDeclaredField(secondaryLoadManager, "topBundlesLoadDataStore",
true);
- var tvSecondary =
- (TableViewImpl)
FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary, "tableView",
true);
-
- if (serviceUnitStateChannelPrimary.isChannelOwnerAsync().get(5,
TimeUnit.SECONDS)) {
- assertNotNull(tvPrimary);
- assertNull(tvSecondary);
- } else {
- assertNull(tvPrimary);
- assertNotNull(tvSecondary);
+
+ private void makePrimaryAsLeader() throws Exception {
+ log.info("makePrimaryAsLeader");
+ if (channel2.isChannelOwner()) {
+ pulsar2.getLeaderElectionService().close();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(()
-> {
+ assertTrue(channel1.isChannelOwner());
+ });
+ pulsar2.getLeaderElectionService().start();
}
- restartBroker();
- pulsar1 = pulsar;
- setPrimaryLoadManager();
-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
- Sets.newHashSet(this.conf.getClusterName()));
-
- var serviceUnitStateChannelPrimaryNew =
- (ServiceUnitStateChannelImpl)
FieldUtils.readDeclaredField(primaryLoadManager,
- "serviceUnitStateChannel", true);
- var topBundlesLoadDataStorePrimaryNew =
- (LoadDataStore)
FieldUtils.readDeclaredField(primaryLoadManager, "topBundlesLoadDataStore"
- , true);
- Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
-
assertFalse(serviceUnitStateChannelPrimaryNew.isChannelOwnerAsync().get(5,
TimeUnit.SECONDS));
-
assertNotNull(FieldUtils.readDeclaredField(topBundlesLoadDataStoreSecondary,
"tableView"
- , true));
-
assertNull(FieldUtils.readDeclaredField(topBundlesLoadDataStorePrimaryNew,
"tableView"
- , true));
- }
- );
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertTrue(channel1.isChannelOwner());
+ });
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertFalse(channel2.isChannelOwner());
+ });
}
- @Test
- public void testRoleChange() throws Exception {
- var topBundlesLoadDataStorePrimary =
(LoadDataStore<TopBundlesLoadData>)
- FieldUtils.readDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore", true);
+ private void makeSecondaryAsLeader() throws Exception {
+ log.info("makeSecondaryAsLeader");
+ if (channel1.isChannelOwner()) {
+ pulsar1.getLeaderElectionService().close();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(()
-> {
+ assertTrue(channel2.isChannelOwner());
+ });
+ pulsar1.getLeaderElectionService().start();
+ }
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertTrue(channel2.isChannelOwner());
+ });
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertFalse(channel1.isChannelOwner());
+ });
+ }
+
+ @Test(timeOut = 30 * 1000)
+ public void testRoleChangeIdempotency() throws Exception {
+
+ makePrimaryAsLeader();
+
+ var topBundlesLoadDataStorePrimary =
primaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStorePrimarySpy =
spy(topBundlesLoadDataStorePrimary);
AtomicInteger countPri = new AtomicInteger(3);
AtomicInteger countPri2 = new AtomicInteger(3);
@@ -1255,8 +1248,7 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
return null;
}).when(topBundlesLoadDataStorePrimarySpy).closeTableView();
- var topBundlesLoadDataStoreSecondary =
(LoadDataStore<TopBundlesLoadData>)
- FieldUtils.readDeclaredField(secondaryLoadManager,
"topBundlesLoadDataStore", true);
+ var topBundlesLoadDataStoreSecondary =
secondaryLoadManager.getTopBundlesLoadDataStore();
var topBundlesLoadDataStoreSecondarySpy =
spy(topBundlesLoadDataStoreSecondary);
AtomicInteger countSec = new AtomicInteger(3);
AtomicInteger countSec2 = new AtomicInteger(3);
@@ -1284,51 +1276,30 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
topBundlesLoadDataStoreSecondarySpy, true);
- if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
- primaryLoadManager.playLeader();
- secondaryLoadManager.playFollower();
- verify(topBundlesLoadDataStorePrimarySpy,
times(3)).startTableView();
- verify(topBundlesLoadDataStorePrimarySpy,
times(5)).closeTableView();
- verify(topBundlesLoadDataStoreSecondarySpy,
times(0)).startTableView();
- verify(topBundlesLoadDataStoreSecondarySpy,
times(3)).closeTableView();
- } else {
- primaryLoadManager.playFollower();
- secondaryLoadManager.playLeader();
- verify(topBundlesLoadDataStoreSecondarySpy,
times(3)).startTableView();
- verify(topBundlesLoadDataStoreSecondarySpy,
times(5)).closeTableView();
- verify(topBundlesLoadDataStorePrimarySpy,
times(0)).startTableView();
- verify(topBundlesLoadDataStorePrimarySpy,
times(3)).closeTableView();
- }
+
+ primaryLoadManager.playLeader();
+ secondaryLoadManager.playFollower();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(3)).startTableView();
+ verify(topBundlesLoadDataStorePrimarySpy,
times(5)).closeTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(0)).startTableView();
+ verify(topBundlesLoadDataStoreSecondarySpy,
times(3)).closeTableView();
+
primaryLoadManager.playFollower();
secondaryLoadManager.playFollower();
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ primaryLoadManager.getRole());
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ secondaryLoadManager.getRole());
- if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
- assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
- FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
- assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
- FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
- } else {
- assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
- FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
- assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
- FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
- }
primaryLoadManager.playLeader();
secondaryLoadManager.playLeader();
+ assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
+ primaryLoadManager.getRole());
+ assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
+ secondaryLoadManager.getRole());
- if (channel1.isChannelOwnerAsync().get(5, TimeUnit.SECONDS)) {
- assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
- FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
- assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
- FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
- } else {
- assertEquals(ExtensibleLoadManagerImpl.Role.Follower,
- FieldUtils.readDeclaredField(primaryLoadManager,
"role", true));
- assertEquals(ExtensibleLoadManagerImpl.Role.Leader,
- FieldUtils.readDeclaredField(secondaryLoadManager,
"role", true));
- }
} finally {
FieldUtils.writeDeclaredField(primaryLoadManager,
"topBundlesLoadDataStore",
topBundlesLoadDataStorePrimary, true);
@@ -1336,6 +1307,98 @@ public class ExtensibleLoadManagerImplTest extends
ExtensibleLoadManagerImplBase
topBundlesLoadDataStoreSecondary, true);
}
}
+ @Test(timeOut = 30 * 1000)
+ public void testRoleChange() throws Exception {
+ makePrimaryAsLeader();
+
+ var leader = primaryLoadManager;
+ var follower = secondaryLoadManager;
+
+ BrokerLoadData brokerLoadExpected = new BrokerLoadData();
+ SystemResourceUsage usage = new SystemResourceUsage();
+ var cpu = new ResourceUsage(1.0, 100.0);
+ String key = "b1";
+ usage.setCpu(cpu);
+ brokerLoadExpected.update(usage, 0, 0, 0, 0, 0, 0, conf);
+ String bundle = "public/default/0x00000000_0xffffffff";
+ TopBundlesLoadData topBundlesExpected = new TopBundlesLoadData();
+ topBundlesExpected.getTopBundlesLoadData().clear();
+ topBundlesExpected.getTopBundlesLoadData().add(new
TopBundlesLoadData.BundleLoadData(bundle, new NamespaceBundleStats()));
+
+ follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
+ follower.getTopBundlesLoadDataStore().pushAsync(bundle,
topBundlesExpected);
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
+
+
assertNotNull(FieldUtils.readDeclaredField(leader.getTopBundlesLoadDataStore(),
"tableView", true));
+
assertNull(FieldUtils.readDeclaredField(follower.getTopBundlesLoadDataStore(),
"tableView", true));
+
+ for (String internalTopic :
ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
+
assertTrue(leader.pulsar.getBrokerService().getTopicReference(internalTopic)
+ .isPresent());
+
assertTrue(follower.pulsar.getBrokerService().getTopicReference(internalTopic)
+ .isEmpty());
+
+ assertTrue(leader.pulsar.getNamespaceService()
+
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+ assertFalse(follower.pulsar.getNamespaceService()
+
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+ }
+
+ var actualBrokerLoadLeader =
leader.getBrokerLoadDataStore().get(key);
+ if (actualBrokerLoadLeader.isPresent()) {
+ assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
+ }
+
+ var actualTopBundlesLeader =
leader.getTopBundlesLoadDataStore().get(bundle);
+ if (actualTopBundlesLeader.isPresent()) {
+ assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
+ }
+
+ var actualBrokerLoadFollower =
follower.getBrokerLoadDataStore().get(key);
+ if (actualBrokerLoadFollower.isPresent()) {
+ assertEquals(actualBrokerLoadFollower.get(),
brokerLoadExpected);
+ }
+ });
+
+ makeSecondaryAsLeader();
+
+ var leader2 = secondaryLoadManager;
+ var follower2 = primaryLoadManager;
+
+ brokerLoadExpected.update(usage, 1, 0, 0, 0, 0, 0, conf);
+ topBundlesExpected.getTopBundlesLoadData().get(0).stats().msgRateIn =
1;
+
+ follower.getBrokerLoadDataStore().pushAsync(key, brokerLoadExpected);
+ follower.getTopBundlesLoadDataStore().pushAsync(bundle,
topBundlesExpected);
+
+ Awaitility.await().atMost(30,
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+
assertNotNull(FieldUtils.readDeclaredField(leader2.getTopBundlesLoadDataStore(),
"tableView", true));
+
assertNull(FieldUtils.readDeclaredField(follower2.getTopBundlesLoadDataStore(),
"tableView", true));
+
+ for (String internalTopic :
ExtensibleLoadManagerImpl.INTERNAL_TOPICS) {
+
assertTrue(leader2.pulsar.getBrokerService().getTopicReference(internalTopic)
+ .isPresent());
+
assertTrue(follower2.pulsar.getBrokerService().getTopicReference(internalTopic)
+ .isEmpty());
+
+ assertTrue(leader2.pulsar.getNamespaceService()
+
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+ assertFalse(follower2.pulsar.getNamespaceService()
+
.isServiceUnitOwnedAsync(TopicName.get(internalTopic)).get());
+ }
+
+
+ var actualBrokerLoadLeader =
leader2.getBrokerLoadDataStore().get(key);
+ assertEquals(actualBrokerLoadLeader.get(), brokerLoadExpected);
+
+ var actualTopBundlesLeader =
leader2.getTopBundlesLoadDataStore().get(bundle);
+ assertEquals(actualTopBundlesLeader.get(), topBundlesExpected);
+
+ var actualBrokerLoadFollower =
follower2.getBrokerLoadDataStore().get(key);
+ assertEquals(actualBrokerLoadFollower.get(), brokerLoadExpected);
+ });
+ }
@Test
public void testGetMetrics() throws Exception {