This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 28f53b0214c fix: bundle-data metadata leak because of bundlestats was 
not clean (#17095)
28f53b0214c is described below

commit 28f53b0214c3af8a8ea3ee58ca7d61e60831819f
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Tue Aug 16 10:53:31 2022 +0800

    fix: bundle-data metadata leak because of bundlestats was not clean (#17095)
    
    Co-authored-by: zhiyuanlei <[email protected]>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  4 +-
 .../broker/namespace/NamespaceServiceTest.java     | 58 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index fe6328dd102..4d8f49be965 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -649,7 +649,9 @@ public abstract class NamespacesBase extends AdminResource {
                                         deleteTopicsFuture = 
FutureUtil.waitForAll(futures);
                                     }
                                     return deleteTopicsFuture.thenCompose(
-                                            ___ -> 
pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle));
+                                            ___ -> 
pulsar().getNamespaceService().removeOwnedServiceUnitAsync(bundle))
+                                            .thenRun(() -> 
pulsar().getBrokerService().getBundleStats()
+                                                    
.remove(bundle.toString()));
                                 });
                     });
                 });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 14fef03a30d..9f03c026225 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -721,6 +721,64 @@ public class NamespaceServiceTest extends BrokerTestBase {
         });
     }
 
+    @Test
+    public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
+        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
+        final String namespace = "prop/ns-abc";
+        final String topic1 = "persistent://" + namespace + "/topic1";
+        final String topic2 = "persistent://" + namespace + "/topic2";
+
+        // configure broker with ModularLoadManager
+        conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
+        conf.setForceDeleteNamespaceAllowed(true);
+        restartBroker();
+
+        LoadManager loadManager = spy(pulsar.getLoadManager().get());
+        Field loadManagerField = 
NamespaceService.class.getDeclaredField("loadManager");
+        loadManagerField.setAccessible(true);
+        doReturn(true).when(loadManager).isCentralized();
+        loadManagerField.set(pulsar.getNamespaceService(), new 
AtomicReference<>(loadManager));
+        NamespaceName nsname = NamespaceName.get(namespace);
+
+        @Cleanup
+        PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        @Cleanup
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name1").subscribe();
+        @Cleanup
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2)
+                .subscriptionName("my-subscriber-name2").subscribe();
+
+
+        NamespaceBundle bundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1));
+
+        loadManager.getLeastLoaded(bundle);
+
+        //create znode for bundle-data
+        pulsar.getBrokerService().updateRates();
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + 
"/0x00000000_0xffffffff";
+
+        Optional<GetResult> getResult = 
pulsar.getLocalMetadataStore().get(path).get();
+        assertTrue(getResult.isPresent());
+
+        //delete namespace which will remove bundle and load
+        
pulsar.getAdminClient().namespaces().deleteNamespace(nsname.toString(),true);
+
+        TimeUnit.SECONDS.sleep(5);
+
+        // update broker bundle report to zk
+        loadManager.writeLoadReportOnZookeeper();
+        loadManager.writeResourceQuotasToZooKeeper();
+
+        getResult = pulsar.getLocalMetadataStore().get(path).get();
+        assertFalse(getResult.isPresent());
+
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> 
splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle 
targetBundle) throws Exception {

Reply via email to