This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbdc173e801 [refactor][broker] PIP-301 Part-1: Add BundleDataResources
(#21119)
bbdc173e801 is described below
commit bbdc173e80157296ff0475cfdc4e36f5d063a7df
Author: houxiaoyu <[email protected]>
AuthorDate: Sun Oct 8 19:38:29 2023 +0800
[refactor][broker] PIP-301 Part-1: Add BundleDataResources (#21119)
### Motivation
See pip: https://github.com/apache/pulsar/pull/21129
### Modifications
Add `BundleDataResources`
---
.../broker/resources/LoadBalanceResources.java | 72 ++++++++++++++++++++++
.../broker/resources/NamespaceResources.java | 18 +-----
.../pulsar/broker/resources/PulsarResources.java | 7 ++-
...rcesTest.java => LoadBalanceResourcesTest.java} | 32 +++-------
.../broker/resources/NamespaceResourcesTest.java | 42 -------------
.../pulsar/broker/admin/impl/NamespacesBase.java | 3 +-
.../pulsar/broker/admin/impl/TenantsBase.java | 2 +-
.../loadbalance/impl/ModularLoadManagerImpl.java | 25 +++-----
.../pulsar/broker/web/PulsarWebResource.java | 5 ++
.../apache/pulsar/broker/admin/AdminApi2Test.java | 5 +-
.../impl/ModularLoadManagerImplTest.java | 7 ++-
.../broker/namespace/NamespaceServiceTest.java | 9 ++-
.../broker/testcontext/PulsarTestContext.java | 3 +-
.../testclient/LoadSimulationController.java | 6 +-
14 files changed, 119 insertions(+), 117 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
new file mode 100644
index 00000000000..839997a7035
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+
+@Getter
+public class LoadBalanceResources {
+ public static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
+
+ private final BundleDataResources bundleDataResources;
+
+ public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
+ bundleDataResources = new BundleDataResources(store,
operationTimeoutSec);
+ }
+
+ public static class BundleDataResources extends BaseResources<BundleData> {
+ public BundleDataResources(MetadataStore store, int
operationTimeoutSec) {
+ super(store, BundleData.class, operationTimeoutSec);
+ }
+
+ public CompletableFuture<Optional<BundleData>> getBundleData(String
bundle) {
+ return getAsync(getBundleDataPath(bundle));
+ }
+
+ public CompletableFuture<Void> updateBundleData(String bundle,
BundleData data) {
+ return setWithCreateAsync(getBundleDataPath(bundle), __ -> data);
+ }
+
+ public CompletableFuture<Void> deleteBundleData(String bundle) {
+ return deleteAsync(getBundleDataPath(bundle));
+ }
+
+ // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/`
in metadata-store
+ public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns)
{
+ final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
+ return getStore().deleteRecursive(namespaceBundlePath);
+ }
+
+ // clear resource of `/loadbalance/bundle-data/{tenant}/` in
metadata-store
+ public CompletableFuture<Void> deleteBundleDataTenantAsync(String
tenant) {
+ final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
tenant);
+ return getStore().deleteRecursive(tenantBundlePath);
+ }
+
+ // Get the metadata store path for the given bundle full name.
+ private String getBundleDataPath(final String bundle) {
+ return BUNDLE_DATA_BASE_PATH + "/" + bundle;
+ }
+ }
+}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
index b5ccc9a5a90..1ba353dccaa 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java
@@ -49,18 +49,15 @@ public class NamespaceResources extends
BaseResources<Policies> {
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
- private final MetadataStore localStore;
public static final String POLICIES_READONLY_FLAG_PATH =
"/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
- private static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
- public NamespaceResources(MetadataStore localStore, MetadataStore
configurationStore, int operationTimeoutSec) {
+ public NamespaceResources(MetadataStore configurationStore, int
operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore,
operationTimeoutSec);
partitionedTopicResources = new
PartitionedTopicResources(configurationStore, operationTimeoutSec);
- this.localStore = localStore;
}
public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
@@ -379,17 +376,4 @@ public class NamespaceResources extends
BaseResources<Policies> {
return future;
}
}
-
- // clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in
metadata-store
- public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
- final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
- return this.localStore.deleteRecursive(namespaceBundlePath);
- }
-
- // clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
- public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
- final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
tenant);
- return this.localStore.deleteRecursive(tenantBundlePath);
- }
-
}
\ No newline at end of file
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a3c5633a6db..ad872a5356c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -48,6 +48,8 @@ public class PulsarResources {
@Getter
private final TopicResources topicResources;
@Getter
+ private final LoadBalanceResources loadBalanceResources;
+ @Getter
private final Optional<MetadataStore> localMetadataStore;
@Getter
private final Optional<MetadataStore> configurationMetadataStore;
@@ -60,8 +62,7 @@ public class PulsarResources {
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore,
operationTimeoutSec);
clusterResources = new
ClusterResources(configurationMetadataStore, operationTimeoutSec);
- namespaceResources = new NamespaceResources(localMetadataStore,
configurationMetadataStore
- , operationTimeoutSec);
+ namespaceResources = new
NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new
ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
@@ -76,12 +77,14 @@ public class PulsarResources {
loadReportResources = new
LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
bookieResources = new BookieResources(localMetadataStore,
operationTimeoutSec);
topicResources = new TopicResources(localMetadataStore);
+ loadBalanceResources = new
LoadBalanceResources(localMetadataStore, operationTimeoutSec);
} else {
dynamicConfigResources = null;
localPolicies = null;
loadReportResources = null;
bookieResources = null;
topicResources = null;
+ loadBalanceResources = null;
}
this.localMetadataStore = Optional.ofNullable(localMetadataStore);
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
similarity index 71%
copy from
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
copy to
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
index deb86e1802f..cd7dd01b665 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/LoadBalanceResourcesTest.java
@@ -19,39 +19,25 @@
package org.apache.pulsar.broker.resources;
import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
-public class NamespaceResourcesTest {
-
- private MetadataStore localStore;
+public class LoadBalanceResourcesTest {
private MetadataStore configurationStore;
- private NamespaceResources namespaceResources;
-
- private static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
+ private MetadataStore localStore;
+ private LoadBalanceResources loadBalanceResources;
@BeforeMethod
public void setup() {
localStore = mock(MetadataStore.class);
configurationStore = mock(MetadataStore.class);
- namespaceResources = new NamespaceResources(localStore,
configurationStore, 30);
- }
-
- @Test
- public void test_pathIsFromNamespace() {
- assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
- assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
-
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
-
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
+ loadBalanceResources = new LoadBalanceResources(localStore, 30);
}
/**
@@ -61,11 +47,11 @@ public class NamespaceResourcesTest {
public void testDeleteBundleDataAsync() {
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
- namespaceResources.deleteBundleDataAsync(ns);
+
loadBalanceResources.getBundleDataResources().deleteBundleDataAsync(ns);
String tenant="my-tenant";
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
- namespaceResources.deleteBundleDataTenantAsync(tenant);
+
loadBalanceResources.getBundleDataResources().deleteBundleDataTenantAsync(tenant);
verify(localStore).deleteRecursive(namespaceBundlePath);
verify(localStore).deleteRecursive(tenantBundlePath);
@@ -73,6 +59,4 @@ public class NamespaceResourcesTest {
assertThrows(()->
verify(configurationStore).deleteRecursive(namespaceBundlePath));
assertThrows(()->
verify(configurationStore).deleteRecursive(tenantBundlePath));
}
-
-
-}
\ No newline at end of file
+}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
index deb86e1802f..7fb9e2c476d 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/NamespaceResourcesTest.java
@@ -18,34 +18,12 @@
*/
package org.apache.pulsar.broker.resources;
-import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
-
-import org.apache.pulsar.common.naming.NamespaceName;
-import org.apache.pulsar.metadata.api.MetadataStore;
-import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
public class NamespaceResourcesTest {
- private MetadataStore localStore;
- private MetadataStore configurationStore;
- private NamespaceResources namespaceResources;
-
- private static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
-
- @BeforeMethod
- public void setup() {
- localStore = mock(MetadataStore.class);
- configurationStore = mock(MetadataStore.class);
- namespaceResources = new NamespaceResources(localStore,
configurationStore, 30);
- }
-
@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
@@ -54,25 +32,5 @@ public class NamespaceResourcesTest {
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}
- /**
- * Test that the bundle-data node is deleted from the local stores.
- */
- @Test
- public void testDeleteBundleDataAsync() {
- NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
- String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH,
ns.toString());
- namespaceResources.deleteBundleDataAsync(ns);
-
- String tenant="my-tenant";
- String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
- namespaceResources.deleteBundleDataTenantAsync(tenant);
-
- verify(localStore).deleteRecursive(namespaceBundlePath);
- verify(localStore).deleteRecursive(tenantBundlePath);
-
- assertThrows(()->
verify(configurationStore).deleteRecursive(namespaceBundlePath));
- assertThrows(()->
verify(configurationStore).deleteRecursive(tenantBundlePath));
- }
-
}
\ No newline at end of file
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 4c364068077..8ab1f4dc860 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -468,7 +468,8 @@ public abstract class NamespacesBase extends AdminResource {
// clear z-node of local policies
.thenCompose(ignore ->
getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
// clear /loadbalance/bundle-data
- .thenCompose(ignore ->
namespaceResources().deleteBundleDataAsync(namespaceName));
+ .thenCompose(ignore ->
+
loadBalanceResources().getBundleDataResources().deleteBundleDataAsync(namespaceName));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
index b93f3e3c6eb..74c0367e0b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TenantsBase.java
@@ -236,7 +236,7 @@ public class TenantsBase extends PulsarWebResource {
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
.thenCompose(__ ->
pulsar().getPulsarResources().getLocalPolicies()
.deleteLocalPoliciesTenantAsync(tenant))
- .thenCompose(__ ->
pulsar().getPulsarResources().getNamespaceResources()
+ .thenCompose(__ ->
pulsar().getPulsarResources().getLoadBalanceResources().getBundleDataResources()
.deleteBundleDataTenantAsync(tenant));
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 0d5dbf489e9..586478efa50 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -58,6 +58,7 @@ import
org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
@@ -91,9 +92,6 @@ import org.slf4j.LoggerFactory;
public class ModularLoadManagerImpl implements ModularLoadManager {
private static final Logger log =
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
- // Path to ZNode whose children contain BundleData jsons for each bundle
(new API version of ResourceQuota).
- public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
-
// Default message rate to assume for unseen bundles.
public static final double DEFAULT_MESSAGE_RATE = 50;
@@ -120,7 +118,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
private LockManager<LocalBrokerData> brokersData;
private ResourceLock<LocalBrokerData> brokerDataLock;
- private MetadataCache<BundleData> bundlesCache;
private MetadataCache<ResourceQuota> resourceQuotaCache;
private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
@@ -172,6 +169,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// Pulsar service used to initialize this.
private PulsarService pulsar;
+ private PulsarResources pulsarResources;
+
// Executor service used to update broker data.
private final ExecutorService executors;
@@ -243,8 +242,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
@Override
public void initialize(final PulsarService pulsar) {
this.pulsar = pulsar;
+ this.pulsarResources = pulsar.getPulsarResources();
brokersData =
pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
- bundlesCache =
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
resourceQuotaCache =
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
timeAverageBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
@@ -273,7 +272,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar,
brokerToFailureDomainMap);
// register listeners for domain changes
-
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
+ pulsarResources.getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
executors.execute(
() ->
LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar,
brokerToFailureDomainMap));
@@ -381,7 +380,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
public BundleData getBundleDataOrDefault(final String bundle) {
BundleData bundleData = null;
try {
- Optional<BundleData> optBundleData =
bundlesCache.get(getBundleDataPath(bundle)).join();
+ Optional<BundleData> optBundleData =
+
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
if (optBundleData.isPresent()) {
return optBundleData.get();
}
@@ -418,11 +418,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
return bundleData;
}
- // Get the metadata store path for the given bundle full name.
- public static String getBundleDataPath(final String bundle) {
- return BUNDLE_DATA_PATH + "/" + bundle;
- }
-
// Use the Pulsar client to acquire the namespace bundle stats.
private Map<String, NamespaceBundleStats> getBundleStats() {
return pulsar.getBrokerService().getBundleStats();
@@ -1151,8 +1146,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
for (Map.Entry<String, BundleData> entry :
loadData.getBundleData().entrySet()) {
final String bundle = entry.getKey();
final BundleData data = entry.getValue();
-
futures.add(bundlesCache.readModifyUpdateOrCreate(getBundleDataPath(bundle), __
-> data)
- .thenApply(__ -> null));
+ futures.add(
+
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle,
data));
}
// Write the time average broker data to metadata store.
@@ -1173,7 +1168,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
private void deleteBundleDataFromMetadataStore(String bundle) {
try {
- bundlesCache.delete(getBundleDataPath(bundle)).join();
+
pulsarResources.getLoadBalanceResources().getBundleDataResources().deleteBundleData(bundle).join();
} catch (Exception e) {
if (!(e.getCause() instanceof NotFoundException)) {
log.warn("Failed to delete bundle-data {} from metadata
store", bundle, e);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 0e25c5ce9e6..5602f662f50 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.BookieResources;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.DynamicConfigurationResources;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import
org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyResources;
@@ -1112,6 +1113,10 @@ public abstract class PulsarWebResource {
return pulsar().getPulsarResources().getNamespaceResources();
}
+ protected LoadBalanceResources loadBalanceResources() {
+ return pulsar().getPulsarResources().getLoadBalanceResources();
+ }
+
protected ResourceGroupResources resourceGroupResources() {
return pulsar().getPulsarResources().getResourcegroupResources();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index c68010f967b..5abb0e02e58 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.admin;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -1671,7 +1672,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
final String managedLedgersPath = "/managed-ledgers/" + tenant;
final String partitionedTopicPath = "/admin/partitioned-topics/" +
tenant;
final String localPoliciesPath = "/admin/local-policies/" + tenant;
- final String bundleDataPath = "/loadbalance/bundle-data/" + tenant;
+ final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant;
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgersPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(partitionedTopicPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(localPoliciesPath).join());
@@ -1738,7 +1739,7 @@ public class AdminApi2Test extends
MockedPulsarServiceBaseTest {
assertFalse(admin.topics().getList(namespace).isEmpty());
final String managedLedgersPath = "/managed-ledgers/" + namespace;
- final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
+ final String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + namespace;
// Trigger bundle owned by brokers.
pulsarClient.newProducer().topic(topic).create().close();
// Trigger bundle data write to ZK.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index d8acb6d24e9..557393682fb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.loadbalance.impl;
import static java.lang.Thread.sleep;
import static
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -290,7 +291,7 @@ public class ModularLoadManagerImplTest {
final TimeAverageMessageData longTermMessageData = new
TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
- final String firstBundleDataPath = String.format("%s/%s",
ModularLoadManagerImpl.BUNDLE_DATA_PATH, bundles[0]);
+ final String firstBundleDataPath = String.format("%s/%s",
BUNDLE_DATA_BASE_PATH, bundles[0]);
// Write long message rate for first bundle to ensure that even bundle
distribution is not a coincidence of
// balancing by message rate. If we were balancing by message rate,
one of the brokers should only have this
// one bundle.
@@ -386,7 +387,7 @@ public class ModularLoadManagerImplTest {
final TimeAverageMessageData longTermMessageData = new
TimeAverageMessageData(1000);
longTermMessageData.setMsgRateIn(1000);
bundleData.setLongTermData(longTermMessageData);
- final String firstBundleDataPath = String.format("%s/%s",
ModularLoadManagerImpl.BUNDLE_DATA_PATH, bundles[0]);
+ final String firstBundleDataPath = String.format("%s/%s",
BUNDLE_DATA_BASE_PATH, bundles[0]);
pulsar1.getLocalMetadataStore().getMetadataCache(BundleData.class).create(firstBundleDataPath,
bundleData).join();
String maxTopicOwnedBroker =
primaryLoadManager.selectBrokerForAssignment(bundles[0]).get();
@@ -843,7 +844,7 @@ public class ModularLoadManagerImplTest {
String topicToFindBundle = topicName + 0;
NamespaceBundle bundleWillBeSplit =
pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));
- String bundleDataPath = ModularLoadManagerImpl.BUNDLE_DATA_PATH + "/"
+ tenant + "/" + namespace;
+ String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" +
namespace;
CompletableFuture<List<String>> children =
bundlesCache.getChildren(bundleDataPath);
List<String> bundles = children.join();
assertTrue(bundles.contains(bundleWillBeSplit.getBundleRange()));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 03bb53eb9da..2e584489c06 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.namespace;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -649,7 +650,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
NamespaceBundle targetNamespaceBundle =
bundles.findBundle(TopicName.get(topic + "0"));
String bundle = targetNamespaceBundle.getBundleRange();
- String path = ModularLoadManagerImpl.getBundleDataPath(namespace + "/"
+ bundle);
+ String path = BUNDLE_DATA_BASE_PATH + "/" + namespace + "/" + bundle;
NamespaceBundleStats defaultStats = new NamespaceBundleStats();
defaultStats.msgThroughputIn = 100000;
defaultStats.msgThroughputOut = 100000;
@@ -691,7 +692,6 @@ public class NamespaceServiceTest extends BrokerTestBase {
@Test
public void testModularLoadManagerRemoveInactiveBundleFromLoadData()
throws Exception {
- final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
final String namespace = "pulsar/test/ns1";
final String topic1 = "persistent://" + namespace + "/topic1";
final String topic2 = "persistent://" + namespace + "/topic2";
@@ -742,13 +742,12 @@ public class NamespaceServiceTest extends BrokerTestBase {
Awaitility.await().untilAsserted(() -> {
assertNull(loadData.getBundleData().get(oldBundle.toString()));
- assertFalse(bundlesCache.exists(BUNDLE_DATA_PATH + "/" +
oldBundle.toString()).get());
+ assertFalse(bundlesCache.exists(BUNDLE_DATA_BASE_PATH + "/" +
oldBundle.toString()).get());
});
}
@Test
public void testModularLoadManagerRemoveBundleAndLoad() throws Exception {
- final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";
final String namespace = "prop/ns-abc";
final String bundleName = namespace + "/0x00000000_0xffffffff";
final String topic1 = "persistent://" + namespace + "/topic1";
@@ -783,7 +782,7 @@ public class NamespaceServiceTest extends BrokerTestBase {
pulsar.getBrokerService().updateRates();
waitResourceDataUpdateToZK(loadManager);
- String path = BUNDLE_DATA_PATH + "/" + bundleName;
+ String path = BUNDLE_DATA_BASE_PATH + "/" + bundleName;
Optional<GetResult> getResult =
pulsar.getLocalMetadataStore().get(path).get();
assertTrue(getResult.isPresent());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index db09465dc10..c927a2e61d8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -712,8 +712,7 @@ public class PulsarTestContext implements AutoCloseable {
if (metadataStore == null) {
metadataStore = builder.configurationMetadataStore;
}
- NamespaceResources nsr =
spyConfigPulsarResources.spy(NamespaceResources.class,
- builder.localMetadataStore, metadataStore, 30);
+ NamespaceResources nsr =
spyConfigPulsarResources.spy(NamespaceResources.class,metadataStore, 30);
TopicResources tsr =
spyConfigPulsarResources.spy(TopicResources.class, metadataStore);
pulsarResources(
spyConfigPulsarResources.spy(
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index bbe535df5e2..f2ccd82b390 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.testclient;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
public class LoadSimulationController {
private static final Logger log =
LoggerFactory.getLogger(LoadSimulationController.class);
private static final String QUOTA_ROOT =
"/loadbalance/resource-quota/namespace";
- private static final String BUNDLE_DATA_ROOT = "/loadbalance/bundle-data";
// Input streams for each client to send commands through.
private final DataInputStream[] inputStreams;
@@ -427,7 +427,7 @@ public class LoadSimulationController {
"/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff",
tenantName,
cluster, mangledNamespace);
final String newAPITargetPath = String.format(
-
"/loadbalance/bundle-data/%s/%s/%s/0x00000000_0xffffffff", tenantName, cluster,
+ "%s/%s/%s/%s/0x00000000_0xffffffff",
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
mangledNamespace);
try {
ZkUtils.createFullPathOptimistic(targetZKClient,
oldAPITargetPath,
@@ -484,7 +484,7 @@ public class LoadSimulationController {
futures.add(threadPool.submit(() -> {
for (final Map.Entry<String, ResourceQuota> entry :
bundleToQuota.entrySet()) {
final String bundle = entry.getKey();
- final String newAPIPath = bundle.replace(QUOTA_ROOT,
BUNDLE_DATA_ROOT);
+ final String newAPIPath = bundle.replace(QUOTA_ROOT,
BUNDLE_DATA_BASE_PATH);
final ResourceQuota quota = entry.getValue();
final int tenantStart = QUOTA_ROOT.length() + 1;
final String topic = String.format("persistent://%s/t",
bundle.substring(tenantStart));