This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e098c6bc51371d91d11d2d7978a111b42c8b727e Author: Heesung Sohn <[email protected]> AuthorDate: Fri Nov 22 08:49:07 2024 -0800 [improve][broker] Skip unloading when bundle throughput is zero (ExtensibleLoadManagerImpl only) (#23626) (cherry picked from commit e8657e2b94951b0b98797a6e1d943113121b1e53) --- .../loadbalance/extensions/models/TopKBundles.java | 6 +++++ .../extensions/scheduler/TransferShedder.java | 10 +++++++++ .../extensions/models/TopKBundlesTest.java | 26 ++++++++++++++++++++++ .../extensions/scheduler/TransferShedderTest.java | 20 +++++++++++++++++ 4 files changed, 62 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java index ec26521af41..9c6e9634178 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundles.java @@ -72,6 +72,12 @@ public class TopKBundles { pulsar.getConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled(); for (var etr : bundleStats.entrySet()) { String bundle = etr.getKey(); + var stat = etr.getValue(); + + // skip zero traffic bundles + if (stat.msgThroughputIn + stat.msgThroughputOut == 0) { + continue; + } // TODO: do not filter system topic while shedding if (NamespaceService.isSystemServiceNamespace(NamespaceBundle.getBundleNamespace(bundle))) { continue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java index 7126ccb0341..72d671aa4ca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java @@ -528,6 +528,13 @@ public class TransferShedder implements NamespaceUnloadStrategy { var bundleData = e.stats(); double maxBrokerBundleThroughput = bundleData.msgThroughputIn + bundleData.msgThroughputOut; + if (maxBrokerBundleThroughput == 0) { + if (debugMode) { + log.info(String.format(CANNOT_UNLOAD_BUNDLE_MSG + + " It has zero throughput.", bundle)); + } + continue; + } boolean swap = false; List<Unload> minToMaxUnloads = new ArrayList<>(); double minBrokerBundleSwapThroughput = 0.0; @@ -549,6 +556,9 @@ public class TransferShedder implements NamespaceUnloadStrategy { var minBrokerBundleThroughput = minBrokerBundleData.stats().msgThroughputIn + minBrokerBundleData.stats().msgThroughputOut; + if (minBrokerBundleThroughput == 0) { + continue; + } var maxBrokerNewThroughputTmp = maxBrokerNewThroughput + minBrokerBundleThroughput; var minBrokerNewThroughputTmp = minBrokerNewThroughput - minBrokerBundleThroughput; if (maxBrokerNewThroughputTmp < maxBrokerThroughput diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java index 472d44df890..3445ab393be 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java @@ -88,14 +88,17 @@ public class TopKBundlesTest { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 100000; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 500; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); NamespaceBundleStats stats3 = new NamespaceBundleStats(); stats3.msgRateIn = 10000; + stats3.msgThroughputOut = 10; bundleStats.put(bundle3, stats3); NamespaceBundleStats stats4 = new NamespaceBundleStats(); @@ -118,10 +121,12 @@ public class TopKBundlesTest { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put("pulsar/system/0x00000000_0x0FFFFFFF", stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle1, stats2); topKBundles.update(bundleStats, 2); @@ -131,6 +136,21 @@ public class TopKBundlesTest { assertEquals(top0.bundleName(), bundle1); } + @Test + public void testZeroMsgThroughputBundleStats() { + Map<String, NamespaceBundleStats> bundleStats = new HashMap<>(); + var topKBundles = new TopKBundles(pulsar); + NamespaceBundleStats stats1 = new NamespaceBundleStats(); + bundleStats.put(bundle1, stats1); + + NamespaceBundleStats stats2 = new NamespaceBundleStats(); + bundleStats.put(bundle1, stats2); + + topKBundles.update(bundleStats, 2); + + assertEquals(topKBundles.getLoadData().getTopBundlesLoadData().size(), 0); + } + private void setAntiAffinityGroup() throws MetadataStoreException { LocalPolicies localPolicies = new LocalPolicies(null, null, "namespaceAntiAffinityGroup"); @@ -166,10 +186,12 @@ public class TopKBundlesTest { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); @@ -188,10 +210,12 @@ public class TopKBundlesTest { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); @@ -213,10 +237,12 @@ public class TopKBundlesTest { var topKBundles = new TopKBundles(pulsar); NamespaceBundleStats stats1 = new NamespaceBundleStats(); stats1.msgRateIn = 500; + stats1.msgThroughputOut = 10; bundleStats.put(bundle1, stats1); NamespaceBundleStats stats2 = new NamespaceBundleStats(); stats2.msgRateIn = 10000; + stats2.msgThroughputOut = 10; bundleStats.put(bundle2, stats2); topKBundles.update(bundleStats, 2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 48bef15b5f8..5e20b196c9a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -919,6 +919,26 @@ public class TransferShedderTest { assertEquals(counter.getLoadStd(), setupLoadStd); } + @Test + public void testZeroBundleThroughput() { + UnloadCounter counter = new UnloadCounter(); + TransferShedder transferShedder = new TransferShedder(counter); + var ctx = setupContext(); + var topBundlesLoadDataStore = ctx.topBundleLoadDataStore(); + for (var e : topBundlesLoadDataStore.entrySet()) { + for (var stat : e.getValue().getTopBundlesLoadData()) { + stat.stats().msgThroughputOut = 0; + stat.stats().msgThroughputIn = 0; + + } + } + var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of()); + assertTrue(res.isEmpty()); + assertEquals(counter.getBreakdownCounters().get(Skip).get(NoBundles).get(), 1); + assertEquals(counter.getLoadAvg(), setupLoadAvg); + assertEquals(counter.getLoadStd(), setupLoadStd); + } + @Test public void testTargetStdAfterTransfer() {
