This is an automated email from the ASF dual-hosted git repository.

kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dfbf05a5d51 [fix] [broker] fix unload bundle count metric. (#22895)
dfbf05a5d51 is described below

commit dfbf05a5d512a8643eb03f9422bfc8d8f42db23c
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Jun 17 12:43:53 2024 +0800

    [fix] [broker] fix unload bundle count metric. (#22895)
    
    ### Motivation
    
    Those bundles that are filtered when try to unload them should not be 
included in the indicator.
    
    ### Modifications
    
    Increment the metric only when the bundle are unloaded.
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 16 +++---
 .../impl/ModularLoadManagerImplTest.java           | 59 ++++++++++++++++++++++
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 764580e9b6d..e1259e97aa3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -625,6 +626,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         final Multimap<String, String> bundlesToUnload = 
loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
 
         bundlesToUnload.asMap().forEach((broker, bundles) -> {
+            AtomicBoolean unloadBundleForBroker = new AtomicBoolean(false);
             bundles.forEach(bundle -> {
                 final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
                 final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
@@ -654,24 +656,24 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                     pulsar.getAdminClient().namespaces()
                             .unloadNamespaceBundle(namespaceName, bundleRange, 
destBroker.get());
                     loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
+                    unloadBundleCount++;
+                    unloadBundleForBroker.set(true);
                 } catch (PulsarServerException | PulsarAdminException e) {
                     log.warn("Error when trying to perform load shedding on {} 
for broker {}", bundle, broker, e);
                 }
             });
+            if (unloadBundleForBroker.get()) {
+                unloadBrokerCount++;
+            }
         });
 
-        updateBundleUnloadingMetrics(bundlesToUnload);
+        updateBundleUnloadingMetrics();
     }
 
     /**
      * As leader broker, update bundle unloading metrics.
-     *
-     * @param bundlesToUnload
      */
-    private void updateBundleUnloadingMetrics(Multimap<String, String> 
bundlesToUnload) {
-        unloadBrokerCount += bundlesToUnload.keySet().size();
-        unloadBundleCount += bundlesToUnload.values().size();
-
+    private void updateBundleUnloadingMetrics() {
         List<Metrics> metrics = new ArrayList<>();
         Map<String, String> dimensions = new HashMap<>();
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 20a33a70bfa..6ae491c55b8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -23,6 +23,7 @@ import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIM
 import static 
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -563,6 +564,64 @@ public class ModularLoadManagerImplTest {
                 .unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
     }
 
+    @Test
+    public void testUnloadBundleMetric() throws Exception {
+        final NamespaceBundleStats stats1 = new NamespaceBundleStats();
+        final NamespaceBundleStats stats2 = new NamespaceBundleStats();
+        stats1.msgRateIn = 100;
+        stats2.msgRateIn = 200;
+        final Map<String, NamespaceBundleStats> statsMap = new 
ConcurrentHashMap<>();
+        statsMap.put(mockBundleName(1), stats1);
+        statsMap.put(mockBundleName(2), stats2);
+        final LocalBrokerData localBrokerData = new LocalBrokerData();
+        localBrokerData.update(new SystemResourceUsage(), statsMap);
+        final Namespaces namespacesSpy1 = 
spy(pulsar1.getAdminClient().namespaces());
+        
doNothing().when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString());
+        setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
+        ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+
+        pulsar1.getConfiguration().setLoadBalancerEnabled(true);
+        final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, 
"loadData");
+
+        final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
+        final BrokerData brokerDataSpy1 = 
spy(brokerDataMap.get(primaryBrokerId));
+        when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
+        brokerDataMap.put(primaryBrokerId, brokerDataSpy1);
+        // Need to update all the bundle data for the shredder to see the spy.
+        primaryLoadManagerSpy.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
+
+        sleep(100);
+
+        // Most expensive bundle will be unloaded.
+        localBrokerData.setCpu(new ResourceUsage(90, 100));
+        primaryLoadManagerSpy.doLoadShedding();
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 1l);
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 1l);
+
+        // Now less expensive bundle will be unloaded
+        primaryLoadManagerSpy.doLoadShedding();
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+
+        // Now both are in grace period: neither should be unloaded.
+        primaryLoadManagerSpy.doLoadShedding();
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+
+        // clear the recently unloaded bundles to avoid the grace period
+        loadData.getRecentlyUnloadedBundles().clear();
+
+        // Test bundle to be unloaded is filtered.
+        doAnswer(invocation -> {
+            // return empty broker to avoid unloading the bundle
+            return Optional.empty();
+        }).when(primaryLoadManagerSpy).selectBroker(any());
+        primaryLoadManagerSpy.doLoadShedding();
+
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+        assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+    }
+
     // Test that ModularLoadManagerImpl will determine that writing local data 
to ZooKeeper is necessary if certain
     // metrics change by a percentage threshold.
 

Reply via email to