This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 55315247346 [refactor][broker] PIP-301 Part-3: Add QuotaResources 
(#21596)
55315247346 is described below

commit 55315247346725f3c38b11ee397071ee838db3e8
Author: houxiaoyu <[email protected]>
AuthorDate: Tue Dec 5 10:29:32 2023 +0800

    [refactor][broker] PIP-301 Part-3: Add QuotaResources (#21596)
    
    ### Motivation
    
    See pip: https://github.com/apache/pulsar/pull/21129
    
    ### Modifications
    
    Add  `QuotaResources`
---
 .../broker/resources/LoadBalanceResources.java     | 38 ++++++++++++++++++++++
 .../apache/pulsar/broker/cache/BundlesQuotas.java  | 29 ++++++-----------
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 11 ++-----
 .../pulsar/broker/service/BrokerService.java       |  2 +-
 .../pulsar/broker/cache/BundlesQuotasTest.java     | 18 ++++++++--
 .../testclient/LoadSimulationController.java       | 14 ++++----
 6 files changed, 73 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
index e13efefee0b..57a2d16e4e8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -22,6 +22,7 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import lombok.Getter;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
 import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
@@ -30,13 +31,16 @@ import 
org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
 public class LoadBalanceResources {
     public static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
     public static final String BROKER_TIME_AVERAGE_BASE_PATH = 
"/loadbalance/broker-time-average";
+    public static final String RESOURCE_QUOTA_BASE_PATH = 
"/loadbalance/resource-quota";
 
     private final BundleDataResources bundleDataResources;
     private final BrokerTimeAverageDataResources 
brokerTimeAverageDataResources;
+    private final QuotaResources quotaResources;
 
     public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
         bundleDataResources = new BundleDataResources(store, 
operationTimeoutSec);
         brokerTimeAverageDataResources = new 
BrokerTimeAverageDataResources(store, operationTimeoutSec);
+        quotaResources = new QuotaResources(store, operationTimeoutSec);
     }
 
     public static class BundleDataResources extends BaseResources<BundleData> {
@@ -92,4 +96,38 @@ public class LoadBalanceResources {
             return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
         }
     }
+
+    public static class QuotaResources extends BaseResources<ResourceQuota> {
+        public QuotaResources(MetadataStore store, int operationTimeoutSec) {
+            super(store, ResourceQuota.class, operationTimeoutSec);
+        }
+
+        public CompletableFuture<Optional<ResourceQuota>> getQuota(String 
bundle) {
+            return getAsync(getBundleQuotaPath(bundle));
+        }
+
+        public CompletableFuture<Optional<ResourceQuota>> getDefaultQuota() {
+            return getAsync(getDefaultBundleQuotaPath());
+        }
+
+        public CompletableFuture<Void> setWithCreateQuotaAsync(String bundle, 
ResourceQuota quota) {
+            return setWithCreateAsync(getBundleQuotaPath(bundle), __ -> quota);
+        }
+
+        public CompletableFuture<Void> 
setWithCreateDefaultQuotaAsync(ResourceQuota quota) {
+            return setWithCreateAsync(getDefaultBundleQuotaPath(), __ -> 
quota);
+        }
+
+        public CompletableFuture<Void> deleteQuota(String bundle) {
+            return deleteAsync(getBundleQuotaPath(bundle));
+        }
+
+        private String getBundleQuotaPath(String bundle) {
+            return String.format("%s/%s", RESOURCE_QUOTA_BASE_PATH, bundle);
+        }
+
+        private String getDefaultBundleQuotaPath() {
+            return getBundleQuotaPath("default");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
index d70520a09f3..d61fa0b0c81 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/BundlesQuotas.java
@@ -19,18 +19,13 @@
 package org.apache.pulsar.broker.cache;
 
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.policies.data.ResourceQuota;
-import org.apache.pulsar.metadata.api.MetadataCache;
-import org.apache.pulsar.metadata.api.MetadataStore;
 
 public class BundlesQuotas {
-
-    // Root path for resource-quota
-    private static final String RESOURCE_QUOTA_ROOT = 
"/loadbalance/resource-quota";
-    private static final String DEFAULT_RESOURCE_QUOTA_PATH = 
RESOURCE_QUOTA_ROOT + "/default";
-
-    private final MetadataCache<ResourceQuota> resourceQuotaCache;
+    LoadBalanceResources loadBalanceResources;
 
     // Default initial quota
     static final ResourceQuota INITIAL_QUOTA = new ResourceQuota();
@@ -44,24 +39,21 @@ public class BundlesQuotas {
         INITIAL_QUOTA.setDynamic(true); // allow dynamically re-calculating
     }
 
-    public BundlesQuotas(MetadataStore localStore) {
-        this.resourceQuotaCache = 
localStore.getMetadataCache(ResourceQuota.class);
+    public BundlesQuotas(PulsarService pulsar) {
+        loadBalanceResources = 
pulsar.getPulsarResources().getLoadBalanceResources();
     }
 
     public CompletableFuture<Void> setDefaultResourceQuota(ResourceQuota 
quota) {
-        return 
resourceQuotaCache.readModifyUpdateOrCreate(DEFAULT_RESOURCE_QUOTA_PATH, __ -> 
quota)
-                .thenApply(__ -> null);
+        return 
loadBalanceResources.getQuotaResources().setWithCreateDefaultQuotaAsync(quota);
     }
 
     public CompletableFuture<ResourceQuota> getDefaultResourceQuota() {
-        return resourceQuotaCache.get(DEFAULT_RESOURCE_QUOTA_PATH)
+        return loadBalanceResources.getQuotaResources().getDefaultQuota()
                 .thenApply(optResourceQuota -> 
optResourceQuota.orElse(INITIAL_QUOTA));
     }
 
     public CompletableFuture<Void> setResourceQuota(String bundle, 
ResourceQuota quota) {
-        return resourceQuotaCache.readModifyUpdateOrCreate(RESOURCE_QUOTA_ROOT 
+ "/" + bundle,
-                __ -> quota)
-                .thenApply(__ -> null);
+        return 
loadBalanceResources.getQuotaResources().setWithCreateQuotaAsync(bundle, quota);
     }
 
     public CompletableFuture<Void> setResourceQuota(NamespaceBundle bundle, 
ResourceQuota quota) {
@@ -73,7 +65,7 @@ public class BundlesQuotas {
     }
 
     public CompletableFuture<ResourceQuota> getResourceQuota(String bundle) {
-        return resourceQuotaCache.get(RESOURCE_QUOTA_ROOT + "/" + bundle)
+        return loadBalanceResources.getQuotaResources().getQuota(bundle)
                 .thenCompose(optResourceQuota -> {
                     if (optResourceQuota.isPresent()) {
                         return 
CompletableFuture.completedFuture(optResourceQuota.get());
@@ -84,7 +76,6 @@ public class BundlesQuotas {
     }
 
     public CompletableFuture<Void> resetResourceQuota(NamespaceBundle bundle) {
-        return resourceQuotaCache.delete(RESOURCE_QUOTA_ROOT + "/" + 
bundle.toString());
+        return 
loadBalanceResources.getQuotaResources().deleteQuota(bundle.toString());
     }
-
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index afe4d13215c..49c58afa3b9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -72,7 +72,6 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
-import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
@@ -105,9 +104,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // The number of effective samples to keep for observing short term data.
     public static final int NUM_SHORT_SAMPLES = 10;
 
-    // Path to ZNode whose children contain ResourceQuota jsons.
-    public static final String RESOURCE_QUOTA_ZPATH = 
"/loadbalance/resource-quota";
-
     // Set of broker candidates to reuse so that object creation is avoided.
     private final Set<String> brokerCandidateCache;
 
@@ -115,8 +111,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     private LockManager<LocalBrokerData> brokersData;
     private ResourceLock<LocalBrokerData> brokerDataLock;
 
-    private MetadataCache<ResourceQuota> resourceQuotaCache;
-
     // Broker host usage object used to calculate system resource usage.
     private BrokerHostUsage brokerHostUsage;
 
@@ -240,7 +234,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         this.pulsar = pulsar;
         this.pulsarResources = pulsar.getPulsarResources();
         brokersData = 
pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
-        resourceQuotaCache = 
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
         
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
         
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
 
@@ -381,8 +374,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                 return optBundleData.get();
             }
 
-            Optional<ResourceQuota> optQuota = resourceQuotaCache
-                    .get(String.format("%s/%s", RESOURCE_QUOTA_ZPATH, 
bundle)).join();
+            Optional<ResourceQuota> optQuota = 
pulsarResources.getLoadBalanceResources().getQuotaResources()
+                    .getQuota(bundle).join();
             if (optQuota.isPresent()) {
                 ResourceQuota quota = optQuota.get();
                 bundleData = new BundleData(NUM_SHORT_SAMPLES, 
NUM_LONG_SAMPLES);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 43bf60f282e..caefe36073a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -419,7 +419,7 @@ public class BrokerService implements Closeable {
         this.brokerEntryPayloadProcessors = 
BrokerEntryMetadataUtils.loadInterceptors(pulsar.getConfiguration()
                         .getBrokerEntryPayloadProcessors(), 
BrokerService.class.getClassLoader());
 
-        this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore());
+        this.bundlesQuotas = new BundlesQuotas(pulsar);
     }
 
     public void addTopicEventListener(TopicEventsListener... listeners) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
index d78e8c0914c..079ce25318a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/BundlesQuotasTest.java
@@ -24,6 +24,8 @@ import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Range;
 import com.google.common.hash.Hashing;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.resources.LoadBalanceResources;
+import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -41,14 +43,24 @@ public class BundlesQuotasTest {
 
     private MetadataStore store;
     private NamespaceBundleFactory bundleFactory;
+    private PulsarService pulsar;
 
     @BeforeMethod
     public void setup() throws Exception {
         store = MetadataStoreFactory.create("memory:local", 
MetadataStoreConfig.builder().build());
+        LoadBalanceResources.QuotaResources quotaResources = new 
LoadBalanceResources.QuotaResources(store, 30000);
 
-        PulsarService pulsar = mock(PulsarService.class);
+        pulsar = mock(PulsarService.class);
         
when(pulsar.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
         
when(pulsar.getConfigurationMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
+
+        LoadBalanceResources loadBalanceResources = 
mock(LoadBalanceResources.class);
+        
when(loadBalanceResources.getQuotaResources()).thenReturn(quotaResources);
+
+        PulsarResources pulsarResources = mock(PulsarResources.class);
+        
when(pulsarResources.getLoadBalanceResources()).thenReturn(loadBalanceResources);
+
+        when(pulsar.getPulsarResources()).thenReturn(pulsarResources);
         bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
     }
 
@@ -59,7 +71,7 @@ public class BundlesQuotasTest {
 
     @Test
     public void testGetSetDefaultQuota() throws Exception {
-        BundlesQuotas bundlesQuotas = new BundlesQuotas(store);
+        BundlesQuotas bundlesQuotas = new BundlesQuotas(pulsar);
         ResourceQuota quota2 = new ResourceQuota();
         quota2.setMsgRateIn(10);
         quota2.setMsgRateOut(20);
@@ -75,7 +87,7 @@ public class BundlesQuotasTest {
 
     @Test
     public void testGetSetBundleQuota() throws Exception {
-        BundlesQuotas bundlesQuotas = new BundlesQuotas(store);
+        BundlesQuotas bundlesQuotas = new BundlesQuotas(pulsar);
         NamespaceBundle testBundle = new 
NamespaceBundle(NamespaceName.get("pulsar/test/ns-2"),
                 Range.closedOpen(0L, (long) Integer.MAX_VALUE),
                 bundleFactory);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
index 79efbeba3bc..e967ba9e517 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.testclient;
 
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.RESOURCE_QUOTA_BASE_PATH;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -61,7 +62,6 @@ import org.slf4j.LoggerFactory;
  */
 public class LoadSimulationController {
     private static final Logger log = 
LoggerFactory.getLogger(LoadSimulationController.class);
-    private static final String QUOTA_ROOT = "/loadbalance/resource-quota";
 
     // Input streams for each client to send commands through.
     private final DataInputStream[] inputStreams;
@@ -398,7 +398,7 @@ public class LoadSimulationController {
             for (int i = 0; i < clients.length; ++i) {
                 threadLocalMaps[i] = new HashMap<>();
             }
-            getResourceQuotas(QUOTA_ROOT, sourceZKClient, threadLocalMaps);
+            getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, sourceZKClient, 
threadLocalMaps);
             final List<Future> futures = new ArrayList<>(clients.length);
             int i = 0;
             log.info("Copying...");
@@ -411,7 +411,7 @@ public class LoadSimulationController {
                         // Simulation will send messages in and out at about 
the same rate, so just make the rate the
                         // average of in and out.
 
-                        final int tenantStart = QUOTA_ROOT.length() + 1;
+                        final int tenantStart = 
RESOURCE_QUOTA_BASE_PATH.length() + 1;
                         final int clusterStart = bundle.indexOf('/', 
tenantStart) + 1;
                         final String sourceTenant = 
bundle.substring(tenantStart, clusterStart - 1);
                         final int namespaceStart = bundle.indexOf('/', 
clusterStart) + 1;
@@ -424,7 +424,7 @@ public class LoadSimulationController {
                         final String mangledNamespace = String.format("%s-%s", 
manglePrefix, namespace);
                         final BundleData bundleData = 
initializeBundleData(quota, arguments);
                         final String oldAPITargetPath = String.format(
-                                
"/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff", 
tenantName,
+                                "%s/namespace/%s/%s/%s/0x00000000_0xffffffff", 
BUNDLE_DATA_BASE_PATH, tenantName,
                                 cluster, mangledNamespace);
                         final String newAPITargetPath = String.format(
                                 "%s/%s/%s/%s/0x00000000_0xffffffff", 
BUNDLE_DATA_BASE_PATH, tenantName, cluster,
@@ -475,7 +475,7 @@ public class LoadSimulationController {
         for (int i = 0; i < clients.length; ++i) {
             threadLocalMaps[i] = new HashMap<>();
         }
-        getResourceQuotas(QUOTA_ROOT, zkClient, threadLocalMaps);
+        getResourceQuotas(RESOURCE_QUOTA_BASE_PATH, zkClient, threadLocalMaps);
         final List<Future> futures = new ArrayList<>(clients.length);
         int i = 0;
         log.info("Simulating...");
@@ -484,9 +484,9 @@ public class LoadSimulationController {
             futures.add(threadPool.submit(() -> {
                 for (final Map.Entry<String, ResourceQuota> entry : 
bundleToQuota.entrySet()) {
                     final String bundle = entry.getKey();
-                    final String newAPIPath = bundle.replace(QUOTA_ROOT, 
BUNDLE_DATA_BASE_PATH);
+                    final String newAPIPath = 
bundle.replace(RESOURCE_QUOTA_BASE_PATH, BUNDLE_DATA_BASE_PATH);
                     final ResourceQuota quota = entry.getValue();
-                    final int tenantStart = QUOTA_ROOT.length() + 1;
+                    final int tenantStart = RESOURCE_QUOTA_BASE_PATH.length() 
+ 1;
                     final String topic = String.format("persistent://%s/t", 
bundle.substring(tenantStart));
                     final BundleData bundleData = initializeBundleData(quota, 
arguments);
                     // Put the bundle data in the new ZooKeeper.

Reply via email to