This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f7138b9db4a7c8a904d82d9b5e53189306d883ef Author: Lei Zhiyuan <[email protected]> AuthorDate: Tue Aug 16 10:53:31 2022 +0800 fix: bundle-data metadata leak because of bundlestats was not clean (#17095) Co-authored-by: zhiyuanlei <[email protected]> (cherry picked from commit e23a4c7135090d29c069e4ef8deb389f038c520d) --- .../pulsar/broker/admin/impl/NamespacesBase.java | 2 + .../broker/namespace/NamespaceServiceTest.java | 62 ++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index f325fd26a94..ce8791ff8d6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -744,6 +744,7 @@ public abstract class NamespacesBase extends AdminResource { // remove from owned namespace map and ephemeral node from ZK pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); + pulsar().getBrokerService().getBundleStats().remove(bundle.toString()); } catch (WebApplicationException wae) { throw wae; } catch (Exception e) { @@ -810,6 +811,7 @@ public abstract class NamespacesBase extends AdminResource { authoritative, true); // directly remove from owned namespace map and ephemeral node from ZK pulsar().getNamespaceService().removeOwnedServiceUnit(bundle); + pulsar().getBrokerService().getBundleStats().remove(bundle.toString()); } catch (WebApplicationException wae) { throw wae; } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 1fc42214401..90c76937413 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -44,6 +44,8 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.tuple.Pair; @@ -57,6 +59,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm; @@ -676,6 +679,65 @@ public class NamespaceServiceTest extends BrokerTestBase { NamespaceBundle.getBundleNamespace(namespaceBundle.toString()))); } + + @Test + public void testModularLoadManagerRemoveBundleAndLoad() throws Exception { + final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data"; + final String namespace = "prop/ns-abc"; + final String topic1 = "persistent://" + namespace + "/topic1"; + final String topic2 = "persistent://" + namespace + "/topic2"; + + // configure broker with ModularLoadManager + conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + conf.setForceDeleteNamespaceAllowed(true); + restartBroker(); + + LoadManager loadManager = spy(pulsar.getLoadManager().get()); + Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager"); + loadManagerField.setAccessible(true); + doReturn(true).when(loadManager).isCentralized(); + loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager)); + NamespaceName nsname = NamespaceName.get(namespace); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic1) + .subscriptionName("my-subscriber-name1").subscribe(); + @Cleanup + Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topic2) + .subscriptionName("my-subscriber-name2").subscribe(); + + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic1)); + + loadManager.getLeastLoaded(bundle); + + //create znode for bundle-data + pulsar.getBrokerService().updateRates(); + loadManager.writeLoadReportOnZookeeper(); + loadManager.writeResourceQuotasToZooKeeper(); + + String path = BUNDLE_DATA_PATH + "/" + nsname.toString() + "/0x00000000_0xffffffff"; + + Optional<GetResult> getResult = pulsar.getLocalMetadataStore().get(path).get(); + assertTrue(getResult.isPresent()); + + //delete namespace which will remove bundle and load + pulsar.getAdminClient().namespaces().deleteNamespace(nsname.toString(),true); + + TimeUnit.SECONDS.sleep(5); + + // update broker bundle report to zk + loadManager.writeLoadReportOnZookeeper(); + loadManager.writeResourceQuotasToZooKeeper(); + + getResult = pulsar.getLocalMetadataStore().get(path).get(); + assertFalse(getResult.isPresent()); + + } + @SuppressWarnings("unchecked") private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
