This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 8d501f2849e [broker] clean inactive bundle from bundleData in loadData 
and bundlesCache (#13974)
8d501f2849e is described below

commit 8d501f2849e3e51f41d8ac4dd144426bb0b209b6
Author: lixinyang <[email protected]>
AuthorDate: Mon May 9 14:20:42 2022 +0800

    [broker] clean inactive bundle from bundleData in loadData and bundlesCache 
(#13974)
    
    * clean inactive bundle from bundleData in loadData and bundlesCache after 
the bundle be split or unload
    
    * update unit test clean inactive bundle from bundleData in loadData and 
bundlesCache after the bundle be split or unload
    
    * inactive bundle from bundleData in loadData and bundlesCache after the 
bundle be split or unload
    
    * clean inactive bundle from bundleData in loadData and bundlesCache after 
the bundle be split or unload
    
    * update unit test clean inactive bundle from bundleData in loadData and 
bundlesCache after the bundle be split or unload
    
    * inactive bundle from bundleData in loadData and bundlesCache after the 
bundle be split or unload
    
    * change the unit test function argument lists
    
    * fix unit test import list
    
    Co-authored-by: nicklixinyang <[email protected]>
    (cherry picked from commit f5019c81d2680b816eac768b1896333e04ab4214)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 12 ++++
 .../broker/namespace/NamespaceServiceTest.java     | 70 +++++++++++++++++++++-
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 293ff2760f0..07408305289 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -523,6 +523,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // load management decisions may be made.
     private void updateBundleData() {
         final Map<String, BundleData> bundleData = loadData.getBundleData();
+        final Set<String> activeBundles = new HashSet<>();
         // Iterate over the broker data.
         for (Map.Entry<String, BrokerData> brokerEntry : 
loadData.getBrokerData().entrySet()) {
             final String broker = brokerEntry.getKey();
@@ -534,6 +535,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
             for (Map.Entry<String, NamespaceBundleStats> entry : 
statsMap.entrySet()) {
                 final String bundle = entry.getKey();
                 final NamespaceBundleStats stats = entry.getValue();
+                activeBundles.add(bundle);
                 if (bundleData.containsKey(bundle)) {
                     // If we recognize the bundle, add these stats as a new 
sample.
                     bundleData.get(bundle).update(stats);
@@ -546,6 +548,16 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                 }
             }
 
+            //Remove not active bundle from loadData
+            for (String bundle : bundleData.keySet()) {
+                if (!activeBundles.contains(bundle)){
+                    bundleData.remove(bundle);
+                    if (pulsar.getLeaderElectionService().isLeader()){
+                        deleteBundleDataFromMetadataStore(bundle);
+                    }
+                }
+            }
+
             // Remove all loaded bundles from the preallocated maps.
             final Map<String, BundleData> preallocatedBundleData = 
brokerData.getPreallocatedBundleData();
             Set<String> ownedNsBundles = 
pulsar.getNamespaceService().getOwnedServiceUnits()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 81b4bb556cd..8e8c6a29e70 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -45,13 +45,17 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.loadbalance.LoadData;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
+import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.service.Topic;
@@ -59,9 +63,11 @@ import 
org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.common.naming.NamespaceBundle;
-import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -72,7 +78,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
@@ -679,6 +687,64 @@ public class NamespaceServiceTest extends BrokerTestBase {
                         
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
     }
 
+    @Test
+    public void testModularLoadManagerRemoveInactiveBundleFromLoadData() 
throws Exception {
+        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+        final String namespace = "pulsar/test/ns1";
+        final String topic1 = "persistent://" + namespace + "/topic1";
+        final String topic2 = "persistent://" + namespace + "/topic2";
+
+        // configure broker with ModularLoadManager
+        conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        restartBroker();
+
+        LoadManager loadManager = spy(pulsar.getLoadManager().get());
+        Field loadManagerField = 
NamespaceService.class.getDeclaredField("loadManager");
+        loadManagerField.setAccessible(true);
+        doReturn(true).when(loadManager).isCentralized();
+        SimpleResourceUnit resourceUnit = new 
SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
+        Optional<ResourceUnit> res = Optional.of(resourceUnit);
+        
doReturn(res).when(loadManager).getLeastLoaded(any(ServiceUnitId.class));
+        loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager));
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name1").subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+                .subscriptionName("my-subscriber-name2").subscribe();
+        MetadataStoreExtended metadataStoreExtended = 
pulsar.getLocalMetadataStore();
+        //create znode for bundle-data
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        //split bundle
+        NamespaceName nsname = NamespaceName.get(namespace);
+        NamespaceBundles bundles = 
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundles(nsname);
+        NamespaceBundle oldBundle = bundles.findBundle(TopicName.get(topic1));
+        pulsar.getNamespaceService().splitAndOwnBundle(oldBundle, false,
+                NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO).get();
+        // update broker bundle report to zk
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        Field loadDataFiled = 
ModularLoadManagerImpl.class.getDeclaredField("loadData");
+        loadDataFiled.setAccessible(true);
+        LoadData loadData = (LoadData)loadDataFiled
+                .get((ModularLoadManagerImpl) ((ModularLoadManagerWrapper) 
loadManager).getLoadManager());
+        MetadataCache<BundleData> bundlesCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
+        // update broker bundle report to zk
+        waitResourceDataUpdateToZK(loadManager);
+        Awaitility.await().untilAsserted(() -> {
+            assertNull(loadData.getBundleData().get(oldBundle.toString()));
+            assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" + 
oldBundle.toString()).get());
+        });
+    }
+
 
     @Test
     public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {

Reply via email to