This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new bbdc173e801 [refactor][broker] PIP-301 Part-1: Add BundleDataResources 
(#21119)
bbdc173e801 is described below

commit bbdc173e80157296ff0475cfdc4e36f5d063a7df
Author: houxiaoyu <[email protected]>
AuthorDate: Sun Oct 8 19:38:29 2023 +0800

    [refactor][broker] PIP-301 Part-1: Add BundleDataResources (#21119)
    
    ### Motivation
    
    See pip: https://github.com/apache/pulsar/pull/21129
    
    ### Modifications
    
    Add  `BundleDataResources`
---
 .../broker/resources/LoadBalanceResources.java     | 72 ++++++++++++++++++++++
 .../broker/resources/NamespaceResources.java       | 18 +-----
 .../pulsar/broker/resources/PulsarResources.java   |  7 ++-
 ...rcesTest.java => LoadBalanceResourcesTest.java} | 32 +++-------
 .../broker/resources/NamespaceResourcesTest.java   | 42 -------------
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  3 +-
 .../pulsar/broker/admin/impl/TenantsBase.java      |  2 +-
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 25 +++-----
 .../pulsar/broker/web/PulsarWebResource.java       |  5 ++
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  5 +-
 .../impl/ModularLoadManagerImplTest.java           |  7 ++-
 .../broker/namespace/NamespaceServiceTest.java     |  9 ++-
 .../broker/testcontext/PulsarTestContext.java      |  3 +-
 .../testclient/LoadSimulationController.java       |  6 +-
 14 files changed, 119 insertions(+), 117 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
new file mode 100644
index 00000000000..839997a7035
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+
+@Getter
+public class LoadBalanceResources {
+    public static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
+
+    private final BundleDataResources bundleDataResources;
+
+    public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
+        bundleDataResources = new BundleDataResources(store, 
operationTimeoutSec);
+    }
+
+    public static class BundleDataResources extends BaseResources<BundleData> {
+        public BundleDataResources(MetadataStore store, int 
operationTimeoutSec) {
+            super(store, BundleData.class, operationTimeoutSec);
+        }
+
+        public CompletableFuture<Optional<BundleData>> getBundleData(String 
bundle) {
+            return getAsync(getBundleDataPath(bundle));
+        }
+
+        public CompletableFuture<Void> updateBundleData(String bundle, 
BundleData data) {
+            return setWithCreateAsync(getBundleDataPath(bundle), __ -> data);
+        }
+
+        public CompletableFuture<Void> deleteBundleData(String bundle) {
+            return deleteAsync(getBundleDataPath(bundle));
+        }
+
+        // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` 
in metadata-store
+        public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) 
{
+            final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
+            return getStore().deleteRecursive(namespaceBundlePath);
+        }
+
+        // clear resource of `/loadbalance/bundle-data/{tenant}/` in 
metadata-store
+        public CompletableFuture<Void> deleteBundleDataTenantAsync(String 
tenant) {
+            final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
tenant);
+            return getStore().deleteRecursive(tenantBundlePath);
+        }
+
+        // Get the metadata store path for the given bundle full name.
+        private String getBundleDataPath(final String bundle) {
+            return BUNDLE_DATA_BASE_PATH + "/" + bundle;
+        }
+    }
+}
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index b5ccc9a5a90..1ba353dccaa 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -49,18 +49,15 @@ public class NamespaceResources extends 
BaseResources<Policies> {
     private final IsolationPolicyResources isolationPolicies;
     private final PartitionedTopicResources partitionedTopicResources;
     private final MetadataStore configurationStore;
-    private final MetadataStore localStore;
 
     public static final String POLICIES_READONLY_FLAG_PATH = 
"/admin/flags/policies-readonly";
     private static final String NAMESPACE_BASE_PATH = "/namespace";
-    private static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
 
-    public NamespaceResources(MetadataStore localStore, MetadataStore 
configurationStore, int operationTimeoutSec) {
+    public NamespaceResources(MetadataStore configurationStore, int 
operationTimeoutSec) {
         super(configurationStore, Policies.class, operationTimeoutSec);
         this.configurationStore = configurationStore;
         isolationPolicies = new IsolationPolicyResources(configurationStore, 
operationTimeoutSec);
         partitionedTopicResources = new 
PartitionedTopicResources(configurationStore, operationTimeoutSec);
-        this.localStore = localStore;
     }
 
     public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -379,17 +376,4 @@ public class NamespaceResources extends 
BaseResources<Policies> {
             return future;
         }
     }
-
-    // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in 
metadata-store
-    public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
-        final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
-        return this.localStore.deleteRecursive(namespaceBundlePath);
-    }
-
-    // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
-    public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
-        final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
tenant);
-        return this.localStore.deleteRecursive(tenantBundlePath);
-    }
-
 }
\ No newline at end of file
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a3c5633a6db..ad872a5356c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -48,6 +48,8 @@ public class PulsarResources {
     @Getter
     private final TopicResources topicResources;
     @Getter
+    private final LoadBalanceResources loadBalanceResources;
+    @Getter
     private final Optional<MetadataStore> localMetadataStore;
     @Getter
     private final Optional<MetadataStore> configurationMetadataStore;
@@ -60,8 +62,7 @@ public class PulsarResources {
         if (configurationMetadataStore != null) {
             tenantResources = new TenantResources(configurationMetadataStore, 
operationTimeoutSec);
             clusterResources = new 
ClusterResources(configurationMetadataStore, operationTimeoutSec);
-            namespaceResources = new NamespaceResources(localMetadataStore, 
configurationMetadataStore
-                    , operationTimeoutSec);
+            namespaceResources = new 
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
             resourcegroupResources = new 
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
         } else {
             tenantResources = null;
@@ -76,12 +77,14 @@ public class PulsarResources {
             loadReportResources = new 
LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
             bookieResources = new BookieResources(localMetadataStore, 
operationTimeoutSec);
             topicResources = new TopicResources(localMetadataStore);
+            loadBalanceResources = new 
LoadBalanceResources(localMetadataStore, operationTimeoutSec);
         } else {
             dynamicConfigResources = null;
             localPolicies = null;
             loadReportResources = null;
             bookieResources = null;
             topicResources = null;
+            loadBalanceResources = null;
         }
 
         this.localMetadataStore = Optional.ofNullable(localMetadataStore);
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
similarity index 71%
copy from 
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
copy to 
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
index deb86e1802f..cd7dd01b665 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
@@ -19,39 +19,25 @@
 package org.apache.pulsar.broker.resources;
 
 import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-
-public class NamespaceResourcesTest {
-
-    private MetadataStore localStore;
+public class LoadBalanceResourcesTest {
     private MetadataStore configurationStore;
-    private NamespaceResources namespaceResources;
-
-    private static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
+    private MetadataStore localStore;
+    private LoadBalanceResources loadBalanceResources;
 
     @BeforeMethod
     public void setup() {
         localStore = mock(MetadataStore.class);
         configurationStore = mock(MetadataStore.class);
-        namespaceResources = new NamespaceResources(localStore, 
configurationStore, 30);
-    }
-
-    @Test
-    public void test_pathIsFromNamespace() {
-        assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
-        assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
-        
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
-        
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
+        loadBalanceResources = new LoadBalanceResources(localStore, 30);
     }
 
     /**
@@ -61,11 +47,11 @@ public class NamespaceResourcesTest {
     public void testDeleteBundleDataAsync() {
         NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
         String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
-        namespaceResources.deleteBundleDataAsync(ns);
+        
loadBalanceResources.getBundleDataResources().deleteBundleDataAsync(ns);
 
         String tenant="my-tenant";
         String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
-        namespaceResources.deleteBundleDataTenantAsync(tenant);
+        
loadBalanceResources.getBundleDataResources().deleteBundleDataTenantAsync(tenant);
 
         verify(localStore).deleteRecursive(namespaceBundlePath);
         verify(localStore).deleteRecursive(tenantBundlePath);
@@ -73,6 +59,4 @@ public class NamespaceResourcesTest {
         assertThrows(()-> 
verify(configurationStore).deleteRecursive(namespaceBundlePath));
         assertThrows(()-> 
verify(configurationStore).deleteRecursive(tenantBundlePath));
     }
-
-
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
index deb86e1802f..7fb9e2c476d 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
@@ -18,34 +18,12 @@
  */
 package org.apache.pulsar.broker.resources;
 
-import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
-
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-
 public class NamespaceResourcesTest {
 
-    private MetadataStore localStore;
-    private MetadataStore configurationStore;
-    private NamespaceResources namespaceResources;
-
-    private static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
-
-    @BeforeMethod
-    public void setup() {
-        localStore = mock(MetadataStore.class);
-        configurationStore = mock(MetadataStore.class);
-        namespaceResources = new NamespaceResources(localStore, 
configurationStore, 30);
-    }
-
     @Test
     public void test_pathIsFromNamespace() {
         assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
@@ -54,25 +32,5 @@ public class NamespaceResourcesTest {
         
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
     }
 
-    /**
-     *  Test that the bundle-data node is deleted from the local stores.
-     */
-    @Test
-    public void testDeleteBundleDataAsync() {
-        NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
-        String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, 
ns.toString());
-        namespaceResources.deleteBundleDataAsync(ns);
-
-        String tenant="my-tenant";
-        String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
-        namespaceResources.deleteBundleDataTenantAsync(tenant);
-
-        verify(localStore).deleteRecursive(namespaceBundlePath);
-        verify(localStore).deleteRecursive(tenantBundlePath);
-
-        assertThrows(()-> 
verify(configurationStore).deleteRecursive(namespaceBundlePath));
-        assertThrows(()-> 
verify(configurationStore).deleteRecursive(tenantBundlePath));
-    }
-
 
 }
\ No newline at end of file
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4c364068077..8ab1f4dc860 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -468,7 +468,8 @@ public abstract class NamespacesBase extends AdminResource {
                 // clear z-node of local policies
                 .thenCompose(ignore -> 
getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
                 // clear /loadbalance/bundle-data
-                .thenCompose(ignore -> 
namespaceResources().deleteBundleDataAsync(namespaceName));
+                .thenCompose(ignore ->
+                        
loadBalanceResources().getBundleDataResources().deleteBundleDataAsync(namespaceName));
 
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index b93f3e3c6eb..74c0367e0b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -236,7 +236,7 @@ public class TenantsBase extends PulsarWebResource {
                             
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
                 .thenCompose(__ -> 
pulsar().getPulsarResources().getLocalPolicies()
                             .deleteLocalPoliciesTenantAsync(tenant))
-                .thenCompose(__ -> 
pulsar().getPulsarResources().getNamespaceResources()
+                .thenCompose(__ -> 
pulsar().getPulsarResources().getLoadBalanceResources().getBundleDataResources()
                             .deleteBundleDataTenantAsync(tenant));
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 0d5dbf489e9..586478efa50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -58,6 +58,7 @@ import 
org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
 import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.util.ExecutorProvider;
@@ -91,9 +92,6 @@ import org.slf4j.LoggerFactory;
 public class ModularLoadManagerImpl implements ModularLoadManager {
     private static final Logger log = 
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
 
-    // Path to ZNode whose children contain BundleData jsons for each bundle 
(new API version of ResourceQuota).
-    public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
-
     // Default message rate to assume for unseen bundles.
     public static final double DEFAULT_MESSAGE_RATE = 50;
 
@@ -120,7 +118,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     private LockManager<LocalBrokerData> brokersData;
     private ResourceLock<LocalBrokerData> brokerDataLock;
 
-    private MetadataCache<BundleData> bundlesCache;
     private MetadataCache<ResourceQuota> resourceQuotaCache;
     private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
 
@@ -172,6 +169,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // Pulsar service used to initialize this.
     private PulsarService pulsar;
 
+    private PulsarResources pulsarResources;
+
     // Executor service used to update broker data.
     private final ExecutorService executors;
 
@@ -243,8 +242,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     @Override
     public void initialize(final PulsarService pulsar) {
         this.pulsar = pulsar;
+        this.pulsarResources = pulsar.getPulsarResources();
         brokersData = 
pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
-        bundlesCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
         resourceQuotaCache = 
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
         timeAverageBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
         
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
@@ -273,7 +272,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 
         LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, 
brokerToFailureDomainMap);
         // register listeners for domain changes
-        
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
+        pulsarResources.getClusterResources().getFailureDomainResources()
                 .registerListener(__ -> {
                     executors.execute(
                             () -> 
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, 
brokerToFailureDomainMap));
@@ -381,7 +380,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     public BundleData getBundleDataOrDefault(final String bundle) {
         BundleData bundleData = null;
         try {
-            Optional<BundleData> optBundleData = 
bundlesCache.get(getBundleDataPath(bundle)).join();
+            Optional<BundleData> optBundleData =
+                    
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
             if (optBundleData.isPresent()) {
                 return optBundleData.get();
             }
@@ -418,11 +418,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         return bundleData;
     }
 
-    // Get the metadata store path for the given bundle full name.
-    public static String getBundleDataPath(final String bundle) {
-        return BUNDLE_DATA_PATH + "/" + bundle;
-    }
-
     // Use the Pulsar client to acquire the namespace bundle stats.
     private Map<String, NamespaceBundleStats> getBundleStats() {
         return pulsar.getBrokerService().getBundleStats();
@@ -1151,8 +1146,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         for (Map.Entry<String, BundleData> entry : 
loadData.getBundleData().entrySet()) {
             final String bundle = entry.getKey();
             final BundleData data = entry.getValue();
-            
futures.add(bundlesCache.readModifyUpdateOrCreate(getBundleDataPath(bundle), __ 
-> data)
-                    .thenApply(__ -> null));
+            futures.add(
+                    
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle,
 data));
         }
 
         // Write the time average broker data to metadata store.
@@ -1173,7 +1168,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 
     private void deleteBundleDataFromMetadataStore(String bundle) {
         try {
-            bundlesCache.delete(getBundleDataPath(bundle)).join();
+            
pulsarResources.getLoadBalanceResources().getBundleDataResources().deleteBundleData(bundle).join();
         } catch (Exception e) {
             if (!(e.getCause() instanceof NotFoundException)) {
                 log.warn("Failed to delete bundle-data {} from metadata 
store", bundle, e);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 0e25c5ce9e6..5602f662f50 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.BookieResources;
 import org.apache.pulsar.broker.resources.ClusterResources;
 import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.NamespaceResources;
 import 
org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyResources;
@@ -1112,6 +1113,10 @@ public abstract class PulsarWebResource {
         return pulsar().getPulsarResources().getNamespaceResources();
     }
 
+    protected LoadBalanceResources loadBalanceResources() {
+        return pulsar().getPulsarResources().getLoadBalanceResources();
+    }
+
     protected ResourceGroupResources resourceGroupResources() {
         return pulsar().getPulsarResources().getResourcegroupResources();
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c68010f967b..5abb0e02e58 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin;
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -1671,7 +1672,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         final String managedLedgersPath = "/managed-ledgers/" + tenant;
         final String partitionedTopicPath = "/admin/partitioned-topics/" + 
tenant;
         final String localPoliciesPath = "/admin/local-policies/" + tenant;
-        final String bundleDataPath = "/loadbalance/bundle-data/" + tenant;
+        final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant;
         
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
         
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
         
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
@@ -1738,7 +1739,7 @@ public class AdminApi2Test extends 
MockedPulsarServiceBaseTest {
         assertFalse(admin.topics().getList(namespace).isEmpty());
 
         final String managedLedgersPath = "/managed-ledgers/" + namespace;
-        final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
+        final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + namespace;
         // Trigger bundle owned by brokers.
         pulsarClient.newProducer().topic(topic).create().close();
         // Trigger bundle data write to ZK.
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index d8acb6d24e9..557393682fb 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
 
 import static java.lang.Thread.sleep;
 import static 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -290,7 +291,7 @@ public class ModularLoadManagerImplTest {
         final TimeAverageMessageData longTermMessageData = new 
TimeAverageMessageData(1000);
         longTermMessageData.setMsgRateIn(1000);
         bundleData.setLongTermData(longTermMessageData);
-        final String firstBundleDataPath = String.format("%s/%s", 
ModularLoadManagerImpl.BUNDLE_DATA_PATH, bundles[0]);
+        final String firstBundleDataPath = String.format("%s/%s", 
BUNDLE_DATA_BASE_PATH, bundles[0]);
         // Write long message rate for first bundle to ensure that even bundle 
distribution is not a coincidence of
         // balancing by message rate. If we were balancing by message rate, 
one of the brokers should only have this
         // one bundle.
@@ -386,7 +387,7 @@ public class ModularLoadManagerImplTest {
         final TimeAverageMessageData longTermMessageData = new 
TimeAverageMessageData(1000);
         longTermMessageData.setMsgRateIn(1000);
         bundleData.setLongTermData(longTermMessageData);
-        final String firstBundleDataPath = String.format("%s/%s", 
ModularLoadManagerImpl.BUNDLE_DATA_PATH, bundles[0]);
+        final String firstBundleDataPath = String.format("%s/%s", 
BUNDLE_DATA_BASE_PATH, bundles[0]);
         
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath,
 bundleData).join();
         String maxTopicOwnedBroker = 
primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();
 
@@ -843,7 +844,7 @@ public class ModularLoadManagerImplTest {
         String topicToFindBundle = topicName + 0;
         NamespaceBundle bundleWillBeSplit = 
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
 
-        String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/" 
+ tenant + "/" + namespace;
+        String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" + 
namespace;
         CompletableFuture<List<String>> children = 
bundlesCache.getChildren(bundleDataPath);
         List<String> bundles = children.join();
         assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 03bb53eb9da..2e584489c06 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.namespace;
 
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -649,7 +650,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
         NamespaceBundle targetNamespaceBundle =  
bundles.findBundle(TopicName.get(topic + "0"));
         String bundle = targetNamespaceBundle.getBundleRange();
-        String path = ModularLoadManagerImpl.getBundleDataPath(namespace + "/" 
+ bundle);
+        String path = BUNDLE_DATA_BASE_PATH + "/" + namespace + "/" + bundle;
         NamespaceBundleStats defaultStats = new NamespaceBundleStats();
         defaultStats.msgThroughputIn = 100000;
         defaultStats.msgThroughputOut = 100000;
@@ -691,7 +692,6 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
     @Test
     public void testModularLoadManagerRemoveInactiveBundleFromLoadData() 
throws Exception {
-        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
         final String namespace = "pulsar/test/ns1";
         final String topic1 = "persistent://" + namespace + "/topic1";
         final String topic2 = "persistent://" + namespace + "/topic2";
@@ -742,13 +742,12 @@ public class NamespaceServiceTest extends BrokerTestBase {
 
         Awaitility.await().untilAsserted(() -> {
             assertNull(loadData.getBundleData().get(oldBundle.toString()));
-            assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" + 
oldBundle.toString()).get());
+            assertFalse(bundlesCache.exists(BUNDLE_DATA_BASE_PATH + "/" + 
oldBundle.toString()).get());
         });
     }
 
     @Test
     public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
-        final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
         final String namespace = "prop/ns-abc";
         final String bundleName = namespace + "/0x00000000_0xffffffff";
         final String topic1 = "persistent://" + namespace + "/topic1";
@@ -783,7 +782,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
         pulsar.getBrokerService().updateRates();
 
         waitResourceDataUpdateToZK(loadManager);
-        String path = BUNDLE_DATA_PATH + "/" + bundleName;
+        String path = BUNDLE_DATA_BASE_PATH + "/" + bundleName;
 
         Optional<GetResult> getResult = 
pulsar.getLocalMetadataStore().get(path).get();
         assertTrue(getResult.isPresent());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index db09465dc10..c927a2e61d8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -712,8 +712,7 @@ public class PulsarTestContext implements AutoCloseable {
                     if (metadataStore == null) {
                         metadataStore = builder.configurationMetadataStore;
                     }
-                    NamespaceResources nsr = 
spyConfigPulsarResources.spy(NamespaceResources.class,
-                            builder.localMetadataStore, metadataStore, 30);
+                    NamespaceResources nsr = 
spyConfigPulsarResources.spy(NamespaceResources.class,metadataStore, 30);
                     TopicResources tsr = 
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
                     pulsarResources(
                             spyConfigPulsarResources.spy(
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index bbe535df5e2..f2ccd82b390 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.testclient;
 
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
 public class LoadSimulationController {
     private static final Logger log = 
LoggerFactory.getLogger(LoadSimulationController.class);
     private static final String QUOTA_ROOT = 
"/loadbalance/resource-quota/namespace";
-    private static final String BUNDLE_DATA_ROOT = "/loadbalance/bundle-data";
 
     // Input streams for each client to send commands through.
     private final DataInputStream[] inputStreams;
@@ -427,7 +427,7 @@ public class LoadSimulationController {
                                 
"/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff", 
tenantName,
                                 cluster, mangledNamespace);
                         final String newAPITargetPath = String.format(
-                                
"/loadbalance/bundle-data/%s/%s/%s/0x00000000_0xffffffff", tenantName, cluster,
+                                "%s/%s/%s/%s/0x00000000_0xffffffff", 
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
                                 mangledNamespace);
                         try {
                             ZkUtils.createFullPathOptimistic(targetZKClient, 
oldAPITargetPath,
@@ -484,7 +484,7 @@ public class LoadSimulationController {
             futures.add(threadPool.submit(() -> {
                 for (final Map.Entry<String, ResourceQuota> entry : 
bundleToQuota.entrySet()) {
                     final String bundle = entry.getKey();
-                    final String newAPIPath = bundle.replace(QUOTA_ROOT, 
BUNDLE_DATA_ROOT);
+                    final String newAPIPath = bundle.replace(QUOTA_ROOT, 
BUNDLE_DATA_BASE_PATH);
                     final ResourceQuota quota = entry.getValue();
                     final int tenantStart = QUOTA_ROOT.length() + 1;
                     final String topic = String.format("persistent://%s/t", 
bundle.substring(tenantStart));


Reply via email to