This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 7a9bd8f492b4f029df6d096619df67f6b6018796 Author: LinChen <1572139...@qq.com> AuthorDate: Mon Jul 18 22:13:06 2022 +0800 [fix][broker] The configuration loadBalancerNamespaceMaximumBundles is invalid (#16552) (cherry picked from commit 5698b08d57f5497b355aa61ac33e7f1303f1ca8e) --- .../loadbalance/impl/BundleSplitterTask.java | 12 ++++- .../loadbalance/impl/BundleSplitterTaskTest.java | 52 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java index fa48618bd20..f910c2fe772 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.loadbalance.impl; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -39,12 +40,16 @@ public class BundleSplitterTask implements BundleSplitStrategy { private static final Logger log = LoggerFactory.getLogger(BundleSplitStrategy.class); private final Set<String> bundleCache; + private final Map<String, Integer> namespaceBundleCount; + + /** * Construct a BundleSplitterTask. * */ public BundleSplitterTask() { bundleCache = new HashSet<>(); + namespaceBundleCount = new HashMap<>(); } /** @@ -61,12 +66,14 @@ public class BundleSplitterTask implements BundleSplitStrategy { @Override public Set<String> findBundlesToSplit(final LoadData loadData, final PulsarService pulsar) { bundleCache.clear(); + namespaceBundleCount.clear(); final ServiceConfiguration conf = pulsar.getConfiguration(); int maxBundleCount = conf.getLoadBalancerNamespaceMaximumBundles(); long maxBundleTopics = conf.getLoadBalancerNamespaceBundleMaxTopics(); long maxBundleSessions = conf.getLoadBalancerNamespaceBundleMaxSessions(); long maxBundleMsgRate = conf.getLoadBalancerNamespaceBundleMaxMsgRate(); long maxBundleBandwidth = conf.getLoadBalancerNamespaceBundleMaxBandwidthMbytes() * LoadManagerShared.MIBI; + loadData.getBrokerData().forEach((broker, brokerData) -> { LocalBrokerData localData = brokerData.getLocalData(); for (final Map.Entry<String, NamespaceBundleStats> entry : localData.getLastStats().entrySet()) { @@ -90,8 +97,11 @@ public class BundleSplitterTask implements BundleSplitStrategy { try { final int bundleCount = pulsar.getNamespaceService() .getBundleCount(NamespaceName.get(namespace)); - if (bundleCount < maxBundleCount) { + if ((bundleCount + namespaceBundleCount.getOrDefault(namespace, 0)) + < maxBundleCount) { bundleCache.add(bundle); + int bundleNum = namespaceBundleCount.getOrDefault(namespace, 0); + namespaceBundleCount.put(namespace, bundleNum + 1); } else { log.warn( "Could not split namespace bundle {} because namespace {} has too many bundles: {}", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java index 7480989bbb5..9ff266ba96c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java @@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.TimeAverageMessageData; import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -94,6 +95,57 @@ public class BundleSplitterTaskTest { Assert.assertEquals(bundlesToSplit.size(), 0); } + @Test + public void testLoadBalancerNamespaceMaximumBundles() throws Exception { + pulsar.getConfiguration().setLoadBalancerNamespaceMaximumBundles(3); + + final BundleSplitterTask bundleSplitterTask = new BundleSplitterTask(); + LoadData loadData = new LoadData(); + + LocalBrokerData brokerData = new LocalBrokerData(); + Map<String, NamespaceBundleStats> lastStats = new HashMap<>(); + final NamespaceBundleStats namespaceBundleStats = new NamespaceBundleStats(); + namespaceBundleStats.topics = 5; + lastStats.put("ten/ns/0x00000000_0x20000000", namespaceBundleStats); + + final NamespaceBundleStats namespaceBundleStats2 = new NamespaceBundleStats(); + namespaceBundleStats2.topics = 5; + lastStats.put("ten/ns/0x20000000_0x40000000", namespaceBundleStats2); + + final NamespaceBundleStats namespaceBundleStats3 = new NamespaceBundleStats(); + namespaceBundleStats3.topics = 5; + lastStats.put("ten/ns/0x40000000_0x60000000", namespaceBundleStats3); + + brokerData.setLastStats(lastStats); + loadData.getBrokerData().put("broker", new BrokerData(brokerData)); + + BundleData bundleData1 = new BundleData(); + TimeAverageMessageData averageMessageData1 = new TimeAverageMessageData(); + averageMessageData1.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2); + averageMessageData1.setMsgRateOut(1); + bundleData1.setLongTermData(averageMessageData1); + loadData.getBundleData().put("ten/ns/0x00000000_0x20000000", bundleData1); + + BundleData bundleData2 = new BundleData(); + TimeAverageMessageData averageMessageData2 = new TimeAverageMessageData(); + averageMessageData2.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2); + averageMessageData2.setMsgRateOut(1); + bundleData2.setLongTermData(averageMessageData2); + loadData.getBundleData().put("ten/ns/0x20000000_0x40000000", bundleData2); + + BundleData bundleData3 = new BundleData(); + TimeAverageMessageData averageMessageData3 = new TimeAverageMessageData(); + averageMessageData3.setMsgRateIn(pulsar.getConfiguration().getLoadBalancerNamespaceBundleMaxMsgRate() * 2); + averageMessageData3.setMsgRateOut(1); + bundleData3.setLongTermData(averageMessageData3); + loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", bundleData3); + + int currentBundleCount = pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns")); + final Set<String> bundlesToSplit = bundleSplitterTask.findBundlesToSplit(loadData, pulsar); + Assert.assertEquals(bundlesToSplit.size() + currentBundleCount, + pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles()); + } + @AfterMethod(alwaysRun = true) void shutdown() throws Exception {