This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f7138b9db4a7c8a904d82d9b5e53189306d883ef
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Tue Aug 16 10:53:31 2022 +0800

    fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
    
    Co-authored-by: zhiyuanlei <[email protected]>
    (cherry picked from commit e23a4c7135090d29c069e4ef8deb389f038c520d)
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  2 +
 .../broker/namespace/NamespaceServiceTest.java     | 62 ++++++++++++++++++++++
 2 files changed, 64 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index f325fd26a94..ce8791ff8d6 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -744,6 +744,7 @@ public abstract class NamespacesBase extends AdminResource {
 
             // remove from owned namespace map and ephemeral node from ZK
             pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
+            
pulsar().getBrokerService().getBundleStats().remove(bundle.toString());
         } catch (WebApplicationException wae) {
             throw wae;
         } catch (Exception e) {
@@ -810,6 +811,7 @@ public abstract class NamespacesBase extends AdminResource {
                 authoritative, true);
             // directly remove from owned namespace map and ephemeral node 
from ZK
             pulsar().getNamespaceService().removeOwnedServiceUnit(bundle);
+            
pulsar().getBrokerService().getBundleStats().remove(bundle.toString());
         } catch (WebApplicationException wae) {
             throw wae;
         } catch (Exception e) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 1fc42214401..90c76937413 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -44,6 +44,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -57,6 +59,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
@@ -676,6 +679,65 @@ public class NamespaceServiceTest extends BrokerTestBase {
                         
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
     }
 
+
+    @Test
+    public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
+        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+        final String namespace = "prop/ns-abc";
+        final String topic1 = "persistent://" + namespace + "/topic1";
+        final String topic2 = "persistent://" + namespace + "/topic2";
+
+        // configure broker with ModularLoadManager
+        conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        conf.setForceDeleteNamespaceAllowed(true);
+        restartBroker();
+
+        LoadManager loadManager = spy(pulsar.getLoadManager().get());
+        Field loadManagerField = 
NamespaceService.class.getDeclaredField("loadManager");
+        loadManagerField.setAccessible(true);
+        doReturn(true).when(loadManager).isCentralized();
+        loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager));
+        NamespaceName nsname = NamespaceName.get(namespace);
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name1").subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+                .subscriptionName("my-subscriber-name2").subscribe();
+
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1));
+
+        loadManager.getLeastLoaded(bundle);
+
+        //create znode for bundle-data
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + 
"/0x00000000_0xffffffff";
+
+        Optional<GetResult> getResult = 
pulsar.getLocalMetadataStore().get(path).get();
+        assertTrue(getResult.isPresent());
+
+        //delete namespace which will remove bundle and load
+        
pulsar.getAdminClient().namespaces().deleteNamespace(nsname.toString(),true);
+
+        TimeUnit.SECONDS.sleep(5);
+
+        // update broker bundle report to zk
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        getResult = pulsar.getLocalMetadataStore().get(path).get();
+        assertFalse(getResult.isPresent());
+
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> 
splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle 
targetBundle) throws Exception {

Reply via email to