This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e2812bf99cc [fix] [broker] remove bundle-data in local metadata store. 
(#21078)
e2812bf99cc is described below

commit e2812bf99cc7daaa19391eec1b02a19cc1b3bcbd
Author: thetumbled <[email protected]>
AuthorDate: Thu Aug 31 09:38:11 2023 +0800

    [fix] [broker] remove bundle-data in local metadata store. (#21078)
    
    Motivation: When deleting a namespace, we will delete znode under the path 
`/loadbalance/bundle-data` in `local metadata store` instead of `global 
metadata store`.
    
    Modifications: Delete bundle data znode in local metadata store.
---
 .../broker/resources/NamespaceResources.java       |  8 ++--
 .../pulsar/broker/resources/PulsarResources.java   |  3 +-
 .../broker/resources/NamespaceResourcesTest.java   | 44 ++++++++++++++++++++++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 31 ++++++++++++---
 .../broker/testcontext/PulsarTestContext.java      |  3 +-
 5 files changed, 79 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 99ce288fa01..e35c208c208 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -49,16 +49,18 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     private final IsolationPolicyResources isolationPolicies;
     private final PartitionedTopicResources partitionedTopicResources;
     private final MetadataStore configurationStore;
+    private final MetadataStore localStore;
 
     public static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
     private static final String NAMESPACE_BASE_PATH = "/namespace";
     private static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
 
-    public NamespaceResources(MetadataStore configurationStore, int 
operationTimeoutSec) {
+    public NamespaceResources(MetadataStore localStore, MetadataStore 
configurationStore, int operationTimeoutSec) {
         super(configurationStore, Policies.class, operationTimeoutSec);
         this.configurationStore = configurationStore;
         isolationPolicies = new IsolationPolicyResources(configurationStore, 
operationTimeoutSec);
         partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
+        this.localStore = localStore;
     }
 
     public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -381,13 +383,13 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in 
metadata-store
     public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
         final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
-        return getStore().deleteRecursive(namespaceBundlePath);
+        return this.localStore.deleteRecursive(namespaceBundlePath);
     }
 
     // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
     public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
         final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
tenant);
-        return getStore().deleteRecursive(tenantBundlePath);
+        return this.localStore.deleteRecursive(tenantBundlePath);
     }
 
 }
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index dfcd0a4194f..a3c5633a6db 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -60,7 +60,8 @@ public class PulsarResources {
         if (configurationMetadataStore != null) {
             tenantResources = new TenantResources(configurationMetadataStore, 
operationTimeoutSec);
             clusterResources = new 
ClusterResources(configurationMetadataStore, operationTimeoutSec);
-            namespaceResources = new 
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+            namespaceResources = new NamespaceResources(localMetadataStore, 
configurationMetadataStore
+                    , operationTimeoutSec);
             resourcegroupResources = new 
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
         } else {
             tenantResources = null;
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
index 85f54a76dc3..deb86e1802f 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
@@ -18,12 +18,34 @@
  */
 package org.apache.pulsar.broker.resources;
 
+import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class NamespaceResourcesTest {
+
+    private MetadataStore localStore;
+    private MetadataStore configurationStore;
+    private NamespaceResources namespaceResources;
+
+    private static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
+
+    @BeforeMethod
+    public void setup() {
+        localStore = mock(MetadataStore.class);
+        configurationStore = mock(MetadataStore.class);
+        namespaceResources = new NamespaceResources(localStore, 
configurationStore, 30);
+    }
+
     @Test
     public void test_pathIsFromNamespace() {
         assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
@@ -31,4 +53,26 @@ public class NamespaceResourcesTest {
         
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
         
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
     }
+
+    /**
+     *  Test that the bundle-data node is deleted from the local stores.
+     */
+    @Test
+    public void testDeleteBundleDataAsync() {
+        NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
+        String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
+        namespaceResources.deleteBundleDataAsync(ns);
+
+        String tenant="my-tenant";
+        String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
+        namespaceResources.deleteBundleDataTenantAsync(tenant);
+
+        verify(localStore).deleteRecursive(namespaceBundlePath);
+        verify(localStore).deleteRecursive(tenantBundlePath);
+
+        assertThrows(()-> 
verify(configurationStore).deleteRecursive(namespaceBundlePath));
+        assertThrows(()-> 
verify(configurationStore).deleteRecursive(tenantBundlePath));
+    }
+
+
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 8a5b4531dbe..b4fcd21df37 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -1720,6 +1721,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         // Set conf.
         cleanup();
         setNamespaceAttr(namespaceAttr);
+        this.conf.setMetadataStoreUrl("127.0.0.1:2181");
+        this.conf.setConfigurationMetadataStoreUrl("127.0.0.1:2182");
         setup();
 
         String tenant = newUniqueName("test-tenant");
@@ -1740,6 +1743,28 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(topic, 10);
         assertFalse(admin.topics().getList(namespace).isEmpty());
 
+        final String managedLedgersPath = "/managed-ledgers/" + namespace;
+        final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
+        // Trigger bundle owned by brokers.
+        pulsarClient.newProducer().topic(topic).create().close();
+        // Trigger bundle data write to ZK.
+        Awaitility.await().untilAsserted(() -> {
+            boolean bundleDataWereWriten = false;
+            for (PulsarService ps : new PulsarService[]{pulsar, 
mockPulsarSetup.getPulsar()}) {
+                ModularLoadManagerWrapper loadManager = 
(ModularLoadManagerWrapper) ps.getLoadManager().get();
+                ModularLoadManagerImpl loadManagerImpl = 
(ModularLoadManagerImpl) loadManager.getLoadManager();
+                ps.getBrokerService().updateRates();
+                loadManagerImpl.updateLocalBrokerData();
+                loadManagerImpl.writeBundleDataOnZooKeeper();
+                bundleDataWereWriten = bundleDataWereWriten || 
ps.getLocalMetadataStore().exists(bundleDataPath).join();
+            }
+            assertTrue(bundleDataWereWriten);
+        });
+
+        // assert znode exists in metadata store
+        
assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
+        
assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+
         try {
             admin.namespaces().deleteNamespace(namespace, false);
             fail("should have failed due to namespace not empty");
@@ -1756,12 +1781,8 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
         assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
 
-
-        final String managedLedgersPath = "/managed-ledgers/" + namespace;
+        // assert znode deleted in metadata store
         
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
-
-
-        final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
         
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 062047c7133..379b5cf63ff 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -699,7 +699,8 @@ public class PulsarTestContext implements AutoCloseable {
                     if (metadataStore == null) {
                         metadataStore = builder.configurationMetadataStore;
                     }
-                    NamespaceResources nsr = 
spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
+                    NamespaceResources nsr = 
spyConfigPulsarResources.spy(NamespaceResources.class,
+                            builder.localMetadataStore, metadataStore, 30);
                     TopicResources tsr = 
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
                     pulsarResources(
                             spyConfigPulsarResources.spy(

Reply via email to