This is an automated email from the ASF dual-hosted git repository.
houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 55315247346 [refactor][broker] PIP-301 Part-3: Add QuotaResources
(#21596)
55315247346 is described below
commit 55315247346725f3c38b11ee397071ee838db3e8
Author: houxiaoyu <[email protected]>
AuthorDate: Tue Dec 5 10:29:32 2023 +0800
[refactor][broker] PIP-301 Part-3: Add QuotaResources (#21596)
### Motivation
See pip: https://github.com/apache/pulsar/pull/21129
### Modifications
Add `QuotaResources`
---
.../broker/resources/LoadBalanceResources.java | 38 ++++++++++++++++++++++
.../apache/pulsar/broker/cache/BundlesQuotas.java | 29 ++++++-----------
.../loadbalance/impl/ModularLoadManagerImpl.java | 11 ++-----
.../pulsar/broker/service/BrokerService.java | 2 +-
.../pulsar/broker/cache/BundlesQuotasTest.java | 18 ++++++++--
.../testclient/LoadSimulationController.java | 14 ++++----
6 files changed, 73 insertions(+), 39 deletions(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
index e13efefee0b..57a2d16e4e8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -22,6 +22,7 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
@@ -30,13 +31,16 @@ import
org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
public class LoadBalanceResources {
public static final String BUNDLE_DATA_BASE_PATH =
"/loadbalance/bundle-data";
public static final String BROKER_TIME_AVERAGE_BASE_PATH =
"/loadbalance/broker-time-average";
+ public static final String RESOURCE_QUOTA_BASE_PATH =
"/loadbalance/resource-quota";
private final BundleDataResources bundleDataResources;
private final BrokerTimeAverageDataResources
brokerTimeAverageDataResources;
+ private final QuotaResources quotaResources;
public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
bundleDataResources = new BundleDataResources(store,
operationTimeoutSec);
brokerTimeAverageDataResources = new
BrokerTimeAverageDataResources(store, operationTimeoutSec);
+ quotaResources = new QuotaResources(store, operationTimeoutSec);
}
public static class BundleDataResources extends BaseResources<BundleData> {
@@ -92,4 +96,38 @@ public class LoadBalanceResources {
return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
}
}
+
+ public static class QuotaResources extends BaseResources<ResourceQuota> {
+ public QuotaResources(MetadataStore store, int operationTimeoutSec) {
+ super(store, ResourceQuota.class, operationTimeoutSec);
+ }
+
+ public CompletableFuture<Optional<ResourceQuota>> getQuota(String
bundle) {
+ return getAsync(getBundleQuotaPath(bundle));
+ }
+
+ public CompletableFuture<Optional<ResourceQuota>> getDefaultQuota() {
+ return getAsync(getDefaultBundleQuotaPath());
+ }
+
+ public CompletableFuture<Void> setWithCreateQuotaAsync(String bundle,
ResourceQuota quota) {
+ return setWithCreateAsync(getBundleQuotaPath(bundle), __ -> quota);
+ }
+
+ public CompletableFuture<Void>
setWithCreateDefaultQuotaAsync(ResourceQuota quota) {
+ return setWithCreateAsync(getDefaultBundleQuotaPath(), __ ->
quota);
+ }
+
+ public CompletableFuture<Void> deleteQuota(String bundle) {
+ return deleteAsync(getBundleQuotaPath(bundle));
+ }
+
+ private String getBundleQuotaPath(String bundle) {
+ return String.format("%s/%s", RESOURCE_QUOTA_BASE_PATH, bundle);
+ }
+
+ private String getDefaultBundleQuotaPath() {
+ return getBundleQuotaPath("default");
+ }
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
index d70520a09f3..d61fa0b0c81 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
@@ -19,18 +19,13 @@
package org.apache.pulsar.broker.cache;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.ResourceQuota;
-import org.apache.pulsar.metadata.api.MetadataCache;
-import org.apache.pulsar.metadata.api.MetadataStore;
public class BundlesQuotas {
-
- // Root path for resource-quota
- private static final String RESOURCE_QUOTA_ROOT =
"/loadbalance/resource-quota";
- private static final String DEFAULT_RESOURCE_QUOTA_PATH =
RESOURCE_QUOTA_ROOT + "/default";
-
- private final MetadataCache<ResourceQuota> resourceQuotaCache;
+ LoadBalanceResources loadBalanceResources;
// Default initial quota
static final ResourceQuota INITIAL_QUOTA = new ResourceQuota();
@@ -44,24 +39,21 @@ public class BundlesQuotas {
INITIAL_QUOTA.setDynamic(true); // allow dynamically re-calculating
}
- public BundlesQuotas(MetadataStore localStore) {
- this.resourceQuotaCache =
localStore.getMetadataCache(ResourceQuota.class);
+ public BundlesQuotas(PulsarService pulsar) {
+ loadBalanceResources =
pulsar.getPulsarResources().getLoadBalanceResources();
}
public CompletableFuture<Void> setDefaultResourceQuota(ResourceQuota
quota) {
- return
resourceQuotaCache.readModifyUpdateOrCreate(DEFAULT_RESOURCE_QUOTA_PATH, __ ->
quota)
- .thenApply(__ -> null);
+ return
loadBalanceResources.getQuotaResources().setWithCreateDefaultQuotaAsync(quota);
}
public CompletableFuture<ResourceQuota> getDefaultResourceQuota() {
- return resourceQuotaCache.get(DEFAULT_RESOURCE_QUOTA_PATH)
+ return loadBalanceResources.getQuotaResources().getDefaultQuota()
.thenApply(optResourceQuota ->
optResourceQuota.orElse(INITIAL_QUOTA));
}
public CompletableFuture<Void> setResourceQuota(String bundle,
ResourceQuota quota) {
- return resourceQuotaCache.readModifyUpdateOrCreate(RESOURCE_QUOTA_ROOT
+ "/" + bundle,
- __ -> quota)
- .thenApply(__ -> null);
+ return
loadBalanceResources.getQuotaResources().setWithCreateQuotaAsync(bundle, quota);
}
public CompletableFuture<Void> setResourceQuota(NamespaceBundle bundle,
ResourceQuota quota) {
@@ -73,7 +65,7 @@ public class BundlesQuotas {
}
public CompletableFuture<ResourceQuota> getResourceQuota(String bundle) {
- return resourceQuotaCache.get(RESOURCE_QUOTA_ROOT + "/" + bundle)
+ return loadBalanceResources.getQuotaResources().getQuota(bundle)
.thenCompose(optResourceQuota -> {
if (optResourceQuota.isPresent()) {
return
CompletableFuture.completedFuture(optResourceQuota.get());
@@ -84,7 +76,6 @@ public class BundlesQuotas {
}
public CompletableFuture<Void> resetResourceQuota(NamespaceBundle bundle) {
- return resourceQuotaCache.delete(RESOURCE_QUOTA_ROOT + "/" +
bundle.toString());
+ return
loadBalanceResources.getQuotaResources().deleteQuota(bundle.toString());
}
-
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index afe4d13215c..49c58afa3b9 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -72,7 +72,6 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
-import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
@@ -105,9 +104,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
// The number of effective samples to keep for observing short term data.
public static final int NUM_SHORT_SAMPLES = 10;
- // Path to ZNode whose children contain ResourceQuota jsons.
- public static final String RESOURCE_QUOTA_ZPATH =
"/loadbalance/resource-quota";
-
// Set of broker candidates to reuse so that object creation is avoided.
private final Set<String> brokerCandidateCache;
@@ -115,8 +111,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
private LockManager<LocalBrokerData> brokersData;
private ResourceLock<LocalBrokerData> brokerDataLock;
- private MetadataCache<ResourceQuota> resourceQuotaCache;
-
// Broker host usage object used to calculate system resource usage.
private BrokerHostUsage brokerHostUsage;
@@ -240,7 +234,6 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
this.pulsar = pulsar;
this.pulsarResources = pulsar.getPulsarResources();
brokersData =
pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
- resourceQuotaCache =
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
@@ -381,8 +374,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
return optBundleData.get();
}
- Optional<ResourceQuota> optQuota = resourceQuotaCache
- .get(String.format("%s/%s", RESOURCE_QUOTA_ZPATH,
bundle)).join();
+ Optional<ResourceQuota> optQuota =
pulsarResources.getLoadBalanceResources().getQuotaResources()
+ .getQuota(bundle).join();
if (optQuota.isPresent()) {
ResourceQuota quota = optQuota.get();
bundleData = new BundleData(NUM_SHORT_SAMPLES,
NUM_LONG_SAMPLES);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 43bf60f282e..caefe36073a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -419,7 +419,7 @@ public class BrokerService implements Closeable {
this.brokerEntryPayloadProcessors =
BrokerEntryMetadataUtils.loadInterceptors(pulsar.getConfiguration()
.getBrokerEntryPayloadProcessors(),
BrokerService.class.getClassLoader());
- this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
+ this.bundlesQuotas = new BundlesQuotas(pulsar);
}
public void addTopicEventListener(TopicEventsListener... listeners) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
index d78e8c0914c..079ce25318a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertEquals;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -41,14 +43,24 @@ public class BundlesQuotasTest {
private MetadataStore store;
private NamespaceBundleFactory bundleFactory;
+ private PulsarService pulsar;
@BeforeMethod
public void setup() throws Exception {
store = MetadataStoreFactory.create("memory:local",
MetadataStoreConfig.builder().build());
+ LoadBalanceResources.QuotaResources quotaResources = new
LoadBalanceResources.QuotaResources(store, 30000);
- PulsarService pulsar = mock(PulsarService.class);
+ pulsar = mock(PulsarService.class);
when(pulsar.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
when(pulsar.getConfigurationMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
+
+ LoadBalanceResources loadBalanceResources =
mock(LoadBalanceResources.class);
+
when(loadBalanceResources.getQuotaResources()).thenReturn(quotaResources);
+
+ PulsarResources pulsarResources = mock(PulsarResources.class);
+
when(pulsarResources.getLoadBalanceResources()).thenReturn(loadBalanceResources);
+
+ when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
}
@@ -59,7 +71,7 @@ public class BundlesQuotasTest {
@Test
public void testGetSetDefaultQuota() throws Exception {
- BundlesQuotas bundlesQuotas = new BundlesQuotas(store);
+ BundlesQuotas bundlesQuotas = new BundlesQuotas(pulsar);
ResourceQuota quota2 = new ResourceQuota();
quota2.setMsgRateIn(10);
quota2.setMsgRateOut(20);
@@ -75,7 +87,7 @@ public class BundlesQuotasTest {
@Test
public void testGetSetBundleQuota() throws Exception {
- BundlesQuotas bundlesQuotas = new BundlesQuotas(store);
+ BundlesQuotas bundlesQuotas = new BundlesQuotas(pulsar);
NamespaceBundle testBundle = new
NamespaceBundle(NamespaceName.get("pulsar/test/ns-2"),
Range.closedOpen(0L, (long) Integer.MAX_VALUE),
bundleFactory);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index 79efbeba3bc..e967ba9e517 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.testclient;
import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static
org.apache.pulsar.broker.resources.LoadBalanceResources.RESOURCE_QUOTA_BASE_PATH;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
@@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
*/
public class LoadSimulationController {
private static final Logger log =
LoggerFactory.getLogger(LoadSimulationController.class);
- private static final String QUOTA_ROOT = "/loadbalance/resource-quota";
// Input streams for each client to send commands through.
private final DataInputStream[] inputStreams;
@@ -398,7 +398,7 @@ public class LoadSimulationController {
for (int i = 0; i < clients.length; ++i) {
threadLocalMaps[i] = new HashMap<>();
}
- getResourceQuotas(QUOTA_ROOT, sourceZKClient, threadLocalMaps);
+ getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, sourceZKClient,
threadLocalMaps);
final List<Future> futures = new ArrayList<>(clients.length);
int i = 0;
log.info("Copying...");
@@ -411,7 +411,7 @@ public class LoadSimulationController {
// Simulation will send messages in and out at about
the same rate, so just make the rate the
// average of in and out.
- final int tenantStart = QUOTA_ROOT.length() + 1;
+ final int tenantStart =
RESOURCE_QUOTA_BASE_PATH.length() + 1;
final int clusterStart = bundle.indexOf('/',
tenantStart) + 1;
final String sourceTenant =
bundle.substring(tenantStart, clusterStart - 1);
final int namespaceStart = bundle.indexOf('/',
clusterStart) + 1;
@@ -424,7 +424,7 @@ public class LoadSimulationController {
final String mangledNamespace = String.format("%s-%s",
manglePrefix, namespace);
final BundleData bundleData =
initializeBundleData(quota, arguments);
final String oldAPITargetPath = String.format(
-
"/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff",
tenantName,
+ "%s/namespace/%s/%s/%s/0x00000000_0xffffffff",
BUNDLE_DATA_BASE_PATH, tenantName,
cluster, mangledNamespace);
final String newAPITargetPath = String.format(
"%s/%s/%s/%s/0x00000000_0xffffffff",
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
@@ -475,7 +475,7 @@ public class LoadSimulationController {
for (int i = 0; i < clients.length; ++i) {
threadLocalMaps[i] = new HashMap<>();
}
- getResourceQuotas(QUOTA_ROOT, zkClient, threadLocalMaps);
+ getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, zkClient, threadLocalMaps);
final List<Future> futures = new ArrayList<>(clients.length);
int i = 0;
log.info("Simulating...");
@@ -484,9 +484,9 @@ public class LoadSimulationController {
futures.add(threadPool.submit(() -> {
for (final Map.Entry<String, ResourceQuota> entry :
bundleToQuota.entrySet()) {
final String bundle = entry.getKey();
- final String newAPIPath = bundle.replace(QUOTA_ROOT,
BUNDLE_DATA_BASE_PATH);
+ final String newAPIPath =
bundle.replace(RESOURCE_QUOTA_BASE_PATH, BUNDLE_DATA_BASE_PATH);
final ResourceQuota quota = entry.getValue();
- final int tenantStart = QUOTA_ROOT.length() + 1;
+ final int tenantStart = RESOURCE_QUOTA_BASE_PATH.length()
+ 1;
final String topic = String.format("persistent://%s/t",
bundle.substring(tenantStart));
final BundleData bundleData = initializeBundleData(quota,
arguments);
// Put the bundle data in the new ZooKeeper.