This is an automated email from the ASF dual-hosted git repository.
kwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dfbf05a5d51 [fix] [broker] fix unload bundle count metric. (#22895)
dfbf05a5d51 is described below
commit dfbf05a5d512a8643eb03f9422bfc8d8f42db23c
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Jun 17 12:43:53 2024 +0800
[fix] [broker] fix unload bundle count metric. (#22895)
### Motivation
Those bundles that are filtered when try to unload them should not be
included in the indicator.
### Modifications
Increment the metric only when the bundle are unloaded.
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 16 +++---
.../impl/ModularLoadManagerImplTest.java | 59 ++++++++++++++++++++++
2 files changed, 68 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 764580e9b6d..e1259e97aa3 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -38,6 +38,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -625,6 +626,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
final Multimap<String, String> bundlesToUnload =
loadSheddingStrategy.findBundlesForUnloading(loadData, conf);
bundlesToUnload.asMap().forEach((broker, bundles) -> {
+ AtomicBoolean unloadBundleForBroker = new AtomicBoolean(false);
bundles.forEach(bundle -> {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundle);
@@ -654,24 +656,24 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
pulsar.getAdminClient().namespaces()
.unloadNamespaceBundle(namespaceName, bundleRange,
destBroker.get());
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
+ unloadBundleCount++;
+ unloadBundleForBroker.set(true);
} catch (PulsarServerException | PulsarAdminException e) {
log.warn("Error when trying to perform load shedding on {}
for broker {}", bundle, broker, e);
}
});
+ if (unloadBundleForBroker.get()) {
+ unloadBrokerCount++;
+ }
});
- updateBundleUnloadingMetrics(bundlesToUnload);
+ updateBundleUnloadingMetrics();
}
/**
* As leader broker, update bundle unloading metrics.
- *
- * @param bundlesToUnload
*/
- private void updateBundleUnloadingMetrics(Multimap<String, String>
bundlesToUnload) {
- unloadBrokerCount += bundlesToUnload.keySet().size();
- unloadBundleCount += bundlesToUnload.values().size();
-
+ private void updateBundleUnloadingMetrics() {
List<Metrics> metrics = new ArrayList<>();
Map<String, String> dimensions = new HashMap<>();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
index 20a33a70bfa..6ae491c55b8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java
@@ -23,6 +23,7 @@ import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIM
import static
org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -563,6 +564,64 @@ public class ModularLoadManagerImplTest {
.unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
}
+ @Test
+ public void testUnloadBundleMetric() throws Exception {
+ final NamespaceBundleStats stats1 = new NamespaceBundleStats();
+ final NamespaceBundleStats stats2 = new NamespaceBundleStats();
+ stats1.msgRateIn = 100;
+ stats2.msgRateIn = 200;
+ final Map<String, NamespaceBundleStats> statsMap = new
ConcurrentHashMap<>();
+ statsMap.put(mockBundleName(1), stats1);
+ statsMap.put(mockBundleName(2), stats2);
+ final LocalBrokerData localBrokerData = new LocalBrokerData();
+ localBrokerData.update(new SystemResourceUsage(), statsMap);
+ final Namespaces namespacesSpy1 =
spy(pulsar1.getAdminClient().namespaces());
+
doNothing().when(namespacesSpy1).unloadNamespaceBundle(Mockito.anyString(),
Mockito.anyString(), Mockito.anyString());
+ setField(pulsar1.getAdminClient(), "namespaces", namespacesSpy1);
+ ModularLoadManagerImpl primaryLoadManagerSpy = spy(primaryLoadManager);
+
+ pulsar1.getConfiguration().setLoadBalancerEnabled(true);
+ final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy,
"loadData");
+
+ final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
+ final BrokerData brokerDataSpy1 =
spy(brokerDataMap.get(primaryBrokerId));
+ when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
+ brokerDataMap.put(primaryBrokerId, brokerDataSpy1);
+ // Need to update all the bundle data for the shredder to see the spy.
+ primaryLoadManagerSpy.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
+
+ sleep(100);
+
+ // Most expensive bundle will be unloaded.
+ localBrokerData.setCpu(new ResourceUsage(90, 100));
+ primaryLoadManagerSpy.doLoadShedding();
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 1l);
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 1l);
+
+ // Now less expensive bundle will be unloaded
+ primaryLoadManagerSpy.doLoadShedding();
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+
+ // Now both are in grace period: neither should be unloaded.
+ primaryLoadManagerSpy.doLoadShedding();
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+
+ // clear the recently unloaded bundles to avoid the grace period
+ loadData.getRecentlyUnloadedBundles().clear();
+
+ // Test bundle to be unloaded is filtered.
+ doAnswer(invocation -> {
+ // return empty broker to avoid unloading the bundle
+ return Optional.empty();
+ }).when(primaryLoadManagerSpy).selectBroker(any());
+ primaryLoadManagerSpy.doLoadShedding();
+
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBundleCount"), 2l);
+ assertEquals(getField(primaryLoadManagerSpy, "unloadBrokerCount"), 2l);
+ }
+
// Test that ModularLoadManagerImpl will determine that writing local data
to ZooKeeper is necessary if certain
// metrics change by a percentage threshold.