This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49caf878e1487dda6da509380c651366c0b8f0c7 Author: Zhanpeng Wu <[email protected]> AuthorDate: Mon Dec 13 19:44:55 2021 +0800 fix shedding heartbeat ns (#13208) Related to #12252 I found that the problem mentioned in #12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle. 1. fix the parttern matching problem 2. add a test case for it This change is already covered by existing tests. (cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015) --- .../java/org/apache/pulsar/broker/loadbalance/LoadData.java | 10 ++++++++++ .../pulsar/broker/loadbalance/impl/OverloadShedder.java | 8 ++------ .../pulsar/broker/loadbalance/impl/ThresholdShedder.java | 6 +----- .../org/apache/pulsar/broker/namespace/NamespaceService.java | 6 ++++++ .../org/apache/pulsar/common/naming/NamespaceBundle.java | 12 ++++++++++++ .../apache/pulsar/broker/namespace/NamespaceServiceTest.java | 8 ++++++++ 6 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java index a469c5c..4243420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java @@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.apache.pulsar.broker.BrokerData; import org.apache.pulsar.broker.BundleData; +import org.apache.pulsar.broker.namespace.NamespaceService; +import org.apache.pulsar.common.naming.NamespaceBundle; /** * This class represents all data that could be relevant when making a load management decision. @@ -59,6 +62,13 @@ public class LoadData { return bundleData; } + public Map<String, BundleData> getBundleDataForLoadShedding() { + return bundleData.entrySet().stream() + .filter(e -> !NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(e.getKey()))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + public Map<String, Long> getRecentlyUnloadedBundles() { return recentlyUnloadedBundles; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java index 3f33fa3..985ed6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.Map; @@ -102,10 +100,8 @@ public class OverloadShedder implements LoadSheddingStrategy { // Sort bundles by throughput, then pick the biggest N which combined // make up for at least the minimum throughput to offload - loadData.getBundleData().entrySet().stream() - .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() - && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches() - && localData.getBundles().contains(e.getKey())) + loadData.getBundleDataForLoadShedding().entrySet().stream() + .filter(e -> localData.getBundles().contains(e.getKey())) .map((e) -> { // Map to throughput value // Consider short-term byte rate to address system resource burden diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java index 3e10326..afca708 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.broker.loadbalance.impl; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN; -import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; import java.util.HashMap; @@ -105,9 +103,7 @@ public class ThresholdShedder implements LoadSheddingStrategy { MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false); if (localData.getBundles().size() > 1) { - loadData.getBundleData().entrySet().stream() - .filter(e -> !HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches() - && !HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(e.getKey()).matches()) + loadData.getBundleDataForLoadShedding().entrySet().stream() .map((e) -> { String bundle = e.getKey(); BundleData bundleData = e.getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index f6cba9c..8f6bfb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -1349,6 +1349,12 @@ public class NamespaceService implements AutoCloseable { } } + public static boolean isSystemServiceNamespace(String namespace) { + return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches(); + } + public boolean registerSLANamespace() throws PulsarServerException { boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false); if (isNameSpaceRegistered) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java index 1531095..98dcb93 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java @@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId, Comparable<NamespaceBundl return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1); } + public static String getBundleNamespace(String namespaceBundle) { + int index = namespaceBundle.lastIndexOf('/'); + if (index != -1) { + try { + return NamespaceName.get(namespaceBundle.substring(0, index)).toString(); + } catch (Exception e) { + // return itself if meets invalid format + } + } + return namespaceBundle; + } + public NamespaceBundleFactory getNamespaceBundleFactory() { return factory; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java index 8d35cd3..d45dcc2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java @@ -539,6 +539,14 @@ public class NamespaceServiceTest extends BrokerTestBase { } } + @Test + public void testHeartbeatNamespaceMatch() throws Exception { + NamespaceName namespaceName = NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf); + NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName); + assertTrue(NamespaceService.isSystemServiceNamespace( + NamespaceBundle.getBundleNamespace(namespaceBundle.toString()))); + } + @SuppressWarnings("unchecked") private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory, NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
