This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 8d501f2849e [broker] clean inactive bundle from bundleData in loadData
and bundlesCache (#13974)
8d501f2849e is described below
commit 8d501f2849e3e51f41d8ac4dd144426bb0b209b6
Author: lixinyang <[email protected]>
AuthorDate: Mon May 9 14:20:42 2022 +0800
[broker] clean inactive bundle from bundleData in loadData and bundlesCache
(#13974)
* clean inactive bundle from bundleData in loadData and bundlesCache after
the bundle be split or unload
* update unit test clean inactive bundle from bundleData in loadData and
bundlesCache after the bundle be split or unload
* inactive bundle from bundleData in loadData and bundlesCache after the
bundle be split or unload
* clean inactive bundle from bundleData in loadData and bundlesCache after
the bundle be split or unload
* update unit test clean inactive bundle from bundleData in loadData and
bundlesCache after the bundle be split or unload
* inactive bundle from bundleData in loadData and bundlesCache after the
bundle be split or unload
* change the unit test function argument lists
* fix unit test import list
Co-authored-by: nicklixinyang <[email protected]>
(cherry picked from commit f5019c81d2680b816eac768b1896333e04ab4214)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 12 ++++
.../broker/namespace/NamespaceServiceTest.java | 70 +++++++++++++++++++++-
2 files changed, 80 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 293ff2760f0..07408305289 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// load management decisions may be made.
private void updateBundleData() {
final Map<String, BundleData> bundleData = loadData.getBundleData();
+ final Set<String> activeBundles = new HashSet<>();
// Iterate over the broker data.
for (Map.Entry<String, BrokerData> brokerEntry :
loadData.getBrokerData().entrySet()) {
final String broker = brokerEntry.getKey();
@@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
for (Map.Entry<String, NamespaceBundleStats> entry :
statsMap.entrySet()) {
final String bundle = entry.getKey();
final NamespaceBundleStats stats = entry.getValue();
+ activeBundles.add(bundle);
if (bundleData.containsKey(bundle)) {
// If we recognize the bundle, add these stats as a new
sample.
bundleData.get(bundle).update(stats);
@@ -546,6 +548,16 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
}
}
+ //Remove not active bundle from loadData
+ for (String bundle : bundleData.keySet()) {
+ if (!activeBundles.contains(bundle)){
+ bundleData.remove(bundle);
+ if (pulsar.getLeaderElectionService().isLeader()){
+ deleteBundleDataFromMetadataStore(bundle);
+ }
+ }
+ }
+
// Remove all loaded bundles from the preallocated maps.
final Map<String, BundleData> preallocatedBundleData =
brokerData.getPreallocatedBundleData();
Set<String> ownedNsBundles =
pulsar.getNamespaceService().getOwnedServiceUnits()
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 81b4bb556cd..8e8c6a29e70 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -45,13 +45,17 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
@@ -59,9 +63,11 @@ import
org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -72,7 +78,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -679,6 +687,64 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}
+ @Test
+ public void testModularLoadManagerRemoveInactiveBundleFromLoadData()
throws Exception {
+ final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+ final String namespace = "pulsar/test/ns1";
+ final String topic1 = "persistent://" + namespace + "/topic1";
+ final String topic2 = "persistent://" + namespace + "/topic2";
+
+ // configure broker with ModularLoadManager
+ conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+ restartBroker();
+
+ LoadManager loadManager = spy(pulsar.getLoadManager().get());
+ Field loadManagerField =
NamespaceService.class.getDeclaredField("loadManager");
+ loadManagerField.setAccessible(true);
+ doReturn(true).when(loadManager).isCentralized();
+ SimpleResourceUnit resourceUnit = new
SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+ Optional<ResourceUnit> res = Optional.of(resourceUnit);
+
doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
+ loadManagerField.set(pulsar.getNamespaceService(), new
AtomicReference<>(loadManager));
+
+ @Cleanup
+ PulsarClient pulsarClient =
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+ .subscriptionName("my-subscriber-name1").subscribe();
+ @Cleanup
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+ .subscriptionName("my-subscriber-name2").subscribe();
+ MetadataStoreExtended metadataStoreExtended =
pulsar.getLocalMetadataStore();
+ //create znode for bundle-data
+ pulsar.getBrokerService().updateRates();
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ //split bundle
+ NamespaceName nsname = NamespaceName.get(namespace);
+ NamespaceBundles bundles =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundles(nsname);
+ NamespaceBundle oldBundle = bundles.findBundle(TopicName.get(topic1));
+ pulsar.getNamespaceService().splitAndOwnBundle(oldBundle, false,
+ NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
+ // update broker bundle report to zk
+ pulsar.getBrokerService().updateRates();
+ loadManager.writeLoadReportOnZookeeper();
+ loadManager.writeResourceQuotasToZooKeeper();
+
+ Field loadDataFiled =
ModularLoadManagerImpl.class.getDeclaredField("loadData");
+ loadDataFiled.setAccessible(true);
+ LoadData loadData = (LoadData)loadDataFiled
+ .get((ModularLoadManagerImpl) ((ModularLoadManagerWrapper)
loadManager).getLoadManager());
+ MetadataCache<BundleData> bundlesCache =
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
+ // update broker bundle report to zk
+ waitResourceDataUpdateToZK(loadManager);
+ Awaitility.await().untilAsserted(() -> {
+ assertNull(loadData.getBundleData().get(oldBundle.toString()));
+ assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" +
oldBundle.toString()).get());
+ });
+ }
+
@Test
public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {