This is an automated email from the ASF dual-hosted git repository.

houxiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 30d59e3ff25 [refactor][broker ] PIP-301 Part-2: Add 
BrokerTimeAverageDataResources (#21353)
30d59e3ff25 is described below

commit 30d59e3ff257ca00aafb773011a087b6f201cfcf
Author: houxiaoyu <[email protected]>
AuthorDate: Sun Oct 22 21:31:06 2023 -0500

    [refactor][broker ] PIP-301 Part-2: Add BrokerTimeAverageDataResources 
(#21353)
    
    ### Motivation
    
    See pip: https://github.com/apache/pulsar/pull/21129
    
    ### Modifications
    
    Add  `BrokerTimeAverageDataResources`
---
 .../broker/resources/LoadBalanceResources.java     | 23 ++++++++++++++++
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 32 +++++++++-------------
 .../impl/ModularLoadManagerImplTest.java           |  4 +--
 .../apache/pulsar/testclient/BrokerMonitor.java    |  6 ++--
 4 files changed, 41 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
index 839997a7035..e13efefee0b 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java
@@ -24,15 +24,19 @@ import lombok.Getter;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.policies.data.loadbalancer.BundleData;
+import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData;
 
 @Getter
 public class LoadBalanceResources {
     public static final String BUNDLE_DATA_BASE_PATH = 
"/loadbalance/bundle-data";
+    public static final String BROKER_TIME_AVERAGE_BASE_PATH = 
"/loadbalance/broker-time-average";
 
     private final BundleDataResources bundleDataResources;
+    private final BrokerTimeAverageDataResources 
brokerTimeAverageDataResources;
 
     public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
         bundleDataResources = new BundleDataResources(store, 
operationTimeoutSec);
+        brokerTimeAverageDataResources = new 
BrokerTimeAverageDataResources(store, operationTimeoutSec);
     }
 
     public static class BundleDataResources extends BaseResources<BundleData> {
@@ -69,4 +73,23 @@ public class LoadBalanceResources {
             return BUNDLE_DATA_BASE_PATH + "/" + bundle;
         }
     }
+
+    public static class BrokerTimeAverageDataResources extends 
BaseResources<TimeAverageBrokerData> {
+        public BrokerTimeAverageDataResources(MetadataStore store, int 
operationTimeoutSec) {
+            super(store, TimeAverageBrokerData.class, operationTimeoutSec);
+        }
+
+        public CompletableFuture<Void> updateTimeAverageBrokerData(String 
brokerLookupAddress,
+                TimeAverageBrokerData data) {
+            return 
setWithCreateAsync(getTimeAverageBrokerDataPath(brokerLookupAddress), __ -> 
data);
+        }
+
+        public CompletableFuture<Void> deleteTimeAverageBrokerData(String 
brokerLookupAddress) {
+            return 
deleteAsync(getTimeAverageBrokerDataPath(brokerLookupAddress));
+        }
+
+        private String getTimeAverageBrokerDataPath(final String 
brokerLookupAddress) {
+            return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress;
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 586478efa50..491941d497c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -108,9 +108,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     // Path to ZNode whose children contain ResourceQuota jsons.
     public static final String RESOURCE_QUOTA_ZPATH = 
"/loadbalance/resource-quota/namespace";
 
-    // Path to ZNode containing TimeAverageBrokerData jsons for each broker.
-    public static final String TIME_AVERAGE_BROKER_ZPATH = 
"/loadbalance/broker-time-average";
-
     // Set of broker candidates to reuse so that object creation is avoided.
     private final Set<String> brokerCandidateCache;
 
@@ -119,7 +116,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     private ResourceLock<LocalBrokerData> brokerDataLock;
 
     private MetadataCache<ResourceQuota> resourceQuotaCache;
-    private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;
 
     // Broker host usage object used to calculate system resource usage.
     private BrokerHostUsage brokerHostUsage;
@@ -245,7 +241,6 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         this.pulsarResources = pulsar.getPulsarResources();
         brokersData = 
pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
         resourceQuotaCache = 
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
-        timeAverageBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
         
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
         
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
 
@@ -991,13 +986,13 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
 
             String lookupServiceAddress = pulsar.getLookupServiceAddress();
             brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + 
lookupServiceAddress;
-            final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + 
lookupServiceAddress;
             updateLocalBrokerData();
 
             brokerDataLock = brokersData.acquireLock(brokerZnodePath, 
localData).join();
-
-            
timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath,
-                    __ -> new TimeAverageBrokerData()).join();
+            pulsarResources.getLoadBalanceResources()
+                    .getBrokerTimeAverageDataResources()
+                    .updateTimeAverageBrokerData(lookupServiceAddress, new 
TimeAverageBrokerData())
+                    .join();
             updateAll();
         } catch (Exception e) {
             log.error("Unable to acquire lock for broker: [{}]", 
brokerZnodePath, e);
@@ -1154,9 +1149,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         for (Map.Entry<String, BrokerData> entry : 
loadData.getBrokerData().entrySet()) {
             final String broker = entry.getKey();
             final TimeAverageBrokerData data = 
entry.getValue().getTimeAverageData();
-            futures.add(timeAverageBrokerDataCache.readModifyUpdateOrCreate(
-                    TIME_AVERAGE_BROKER_ZPATH + "/" + broker, __ -> data)
-                    .thenApply(__ -> null));
+            futures.add(pulsarResources.getLoadBalanceResources()
+                    
.getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, data));
         }
 
         try {
@@ -1177,13 +1171,13 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     }
 
     private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
-        final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + 
broker;
-        timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, 
ex) -> {
-            if (ex != null && !(ex.getCause() instanceof 
MetadataStoreException.NotFoundException)) {
-                log.warn("Failed to delete dead broker {} time "
-                        + "average data from metadata store", broker, ex);
-            }
-        });
+        pulsarResources.getLoadBalanceResources()
+                
.getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__,
 ex) -> {
+                    if (ex != null && !(ex.getCause() instanceof 
MetadataStoreException.NotFoundException)) {
+                        log.warn("Failed to delete dead broker {} time "
+                                + "average data from metadata store", broker, 
ex);
+                    }
+                });
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 557393682fb..f8b5c125830 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import static java.lang.Thread.sleep;
-import static 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -778,7 +778,7 @@ public class ModularLoadManagerImplTest {
 
         List<String> data =  pulsar1.getLocalMetadataStore()
                 .getMetadataCache(TimeAverageBrokerData.class)
-                .getChildren(TIME_AVERAGE_BROKER_ZPATH)
+                .getChildren(BROKER_TIME_AVERAGE_BASE_PATH)
                 .join();
 
         Awaitility.await().untilAsserted(() -> 
assertTrue(pulsar1.getLeaderElectionService().isLeader()));
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
index 3f896986016..a3e5a14a416 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.testclient;
 
 import static 
org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC;
+import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
@@ -34,7 +35,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
-import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SizeUnit;
@@ -172,7 +172,7 @@ public class BrokerMonitor {
                     final LocalBrokerData localData = (LocalBrokerData) data;
                     numBundles = localData.getNumBundles();
                     messageRate = localData.getMsgRateIn() + 
localData.getMsgRateOut();
-                    final String timeAveragePath = 
ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+                    final String timeAveragePath = 
BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker;
                     try {
                         final TimeAverageBrokerData timeAverageData = 
gson.fromJson(
                                 new String(zkClient.getData(timeAveragePath, 
false, null)),
@@ -314,7 +314,7 @@ public class BrokerMonitor {
                 printLoadReport(broker, gson.fromJson(jsonString, 
LoadReport.class));
             } else {
                 final LocalBrokerData localBrokerData = 
gson.fromJson(jsonString, LocalBrokerData.class);
-                final String timeAveragePath = 
ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker;
+                final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + 
"/" + broker;
                 try {
                     final TimeAverageBrokerData timeAverageData = 
gson.fromJson(
                             new String(zkClient.getData(timeAveragePath, 
false, null)), TimeAverageBrokerData.class);

Reply via email to