This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e2812bf99cc [fix] [broker] remove bundle-data in local metadata store.
(#21078)
e2812bf99cc is described below
commit e2812bf99cc7daaa19391eec1b02a19cc1b3bcbd
Author: thetumbled <[email protected]>
AuthorDate: Thu Aug 31 09:38:11 2023 +0800
[fix] [broker] remove bundle-data in local metadata store. (#21078)
Motivation: When deleting a namespace, we will delete znode under the path
`/loadbalance/bundle-data` in `local metadata store` instead of `global
metadata store`.
Modifications: Delete bundle data znode in local metadata store.
---
.../broker/resources/NamespaceResources.java | 8 ++--
.../pulsar/broker/resources/PulsarResources.java | 3 +-
.../broker/resources/NamespaceResourcesTest.java | 44 ++++++++++++++++++++++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 31 ++++++++++++---
.../broker/testcontext/PulsarTestContext.java | 3 +-
5 files changed, 79 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index 99ce288fa01..e35c208c208 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -49,16 +49,18 @@ public class NamespaceResources extends
BaseResources<Policies> {
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
+ private final MetadataStore localStore;
public static final String POLICIES_READONLY_FLAG_PATH =
"/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
private static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
- public NamespaceResources(MetadataStore configurationStore, int
operationTimeoutSec) {
+ public NamespaceResources(MetadataStore localStore, MetadataStore
configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore,
operationTimeoutSec);
partitionedTopicResources = new
PartitionedTopicResources(configurationStore, operationTimeoutSec);
+ this.localStore = localStore;
}
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -381,13 +383,13 @@ public class NamespaceResources extends
BaseResources<Policies> {
// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in
metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
- return getStore().deleteRecursive(namespaceBundlePath);
+ return this.localStore.deleteRecursive(namespaceBundlePath);
}
// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
tenant);
- return getStore().deleteRecursive(tenantBundlePath);
+ return this.localStore.deleteRecursive(tenantBundlePath);
}
}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index dfcd0a4194f..a3c5633a6db 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -60,7 +60,8 @@ public class PulsarResources {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore,
operationTimeoutSec);
clusterResources = new
ClusterResources(configurationMetadataStore, operationTimeoutSec);
- namespaceResources = new
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
+ namespaceResources = new NamespaceResources(localMetadataStore,
configurationMetadataStore
+ , operationTimeoutSec);
resourcegroupResources = new
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
index 85f54a76dc3..deb86e1802f 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
@@ -18,12 +18,34 @@
*/
package org.apache.pulsar.broker.resources;
+import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
+
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class NamespaceResourcesTest {
+
+ private MetadataStore localStore;
+ private MetadataStore configurationStore;
+ private NamespaceResources namespaceResources;
+
+ private static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
+
+ @BeforeMethod
+ public void setup() {
+ localStore = mock(MetadataStore.class);
+ configurationStore = mock(MetadataStore.class);
+ namespaceResources = new NamespaceResources(localStore,
configurationStore, 30);
+ }
+
@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
@@ -31,4 +53,26 @@ public class NamespaceResourcesTest {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}
+
+ /**
+ * Test that the bundle-data node is deleted from the local stores.
+ */
+ @Test
+ public void testDeleteBundleDataAsync() {
+ NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
+ String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
+ namespaceResources.deleteBundleDataAsync(ns);
+
+ String tenant="my-tenant";
+ String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
+ namespaceResources.deleteBundleDataTenantAsync(tenant);
+
+ verify(localStore).deleteRecursive(namespaceBundlePath);
+ verify(localStore).deleteRecursive(tenantBundlePath);
+
+ assertThrows(()->
verify(configurationStore).deleteRecursive(namespaceBundlePath));
+ assertThrows(()->
verify(configurationStore).deleteRecursive(tenantBundlePath));
+ }
+
+
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 8a5b4531dbe..b4fcd21df37 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -67,6 +67,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
+import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -1720,6 +1721,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
// Set conf.
cleanup();
setNamespaceAttr(namespaceAttr);
+ this.conf.setMetadataStoreUrl("127.0.0.1:2181");
+ this.conf.setConfigurationMetadataStoreUrl("127.0.0.1:2182");
setup();
String tenant = newUniqueName("test-tenant");
@@ -1740,6 +1743,28 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
+ final String managedLedgersPath = "/managed-ledgers/" + namespace;
+ final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
+ // Trigger bundle owned by brokers.
+ pulsarClient.newProducer().topic(topic).create().close();
+ // Trigger bundle data write to ZK.
+ Awaitility.await().untilAsserted(() -> {
+ boolean bundleDataWereWriten = false;
+ for (PulsarService ps : new PulsarService[]{pulsar,
mockPulsarSetup.getPulsar()}) {
+ ModularLoadManagerWrapper loadManager =
(ModularLoadManagerWrapper) ps.getLoadManager().get();
+ ModularLoadManagerImpl loadManagerImpl =
(ModularLoadManagerImpl) loadManager.getLoadManager();
+ ps.getBrokerService().updateRates();
+ loadManagerImpl.updateLocalBrokerData();
+ loadManagerImpl.writeBundleDataOnZooKeeper();
+ bundleDataWereWriten = bundleDataWereWriten ||
ps.getLocalMetadataStore().exists(bundleDataPath).join();
+ }
+ assertTrue(bundleDataWereWriten);
+ });
+
+ // assert znode exists in metadata store
+
assertTrue(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
+
assertTrue(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
+
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
@@ -1756,12 +1781,8 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
-
- final String managedLedgersPath = "/managed-ledgers/" + namespace;
+ // assert znode deleted in metadata store
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
-
-
- final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 062047c7133..379b5cf63ff 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -699,7 +699,8 @@ public class PulsarTestContext implements AutoCloseable {
if (metadataStore == null) {
metadataStore = builder.configurationMetadataStore;
}
- NamespaceResources nsr =
spyConfigPulsarResources.spy(NamespaceResources.class, metadataStore, 30);
+ NamespaceResources nsr =
spyConfigPulsarResources.spy(NamespaceResources.class,
+ builder.localMetadataStore, metadataStore, 30);
TopicResources tsr =
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
pulsarResources(
spyConfigPulsarResources.spy(