This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 418589b  fix shedding heartbeat ns (#13208)
418589b is described below

commit 418589bd1464b6eb24ec29740ae7e7a268c323dc
Author: Zhanpeng Wu <[email protected]>
AuthorDate: Mon Dec 13 19:44:55 2021 +0800

    fix shedding heartbeat ns (#13208)
    
    Related to #12252
    
    I found that the problem mentioned in #12252 has not been solved, because 
the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what 
actually provides is the full name of the bundle.
    
    1. fix the parttern matching problem
    2. add a test case for it
    
    This change is already covered by existing tests.
    
    (cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015)
---
 .../java/org/apache/pulsar/broker/loadbalance/LoadData.java  | 10 ++++++++++
 .../pulsar/broker/loadbalance/impl/OverloadShedder.java      |  6 ++----
 .../pulsar/broker/loadbalance/impl/ThresholdShedder.java     |  4 +---
 .../org/apache/pulsar/broker/namespace/NamespaceService.java |  5 +++++
 .../org/apache/pulsar/common/naming/NamespaceBundle.java     | 12 ++++++++++++
 .../apache/pulsar/broker/namespace/NamespaceServiceTest.java | 11 +++++++++++
 6 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a469c5c..4243420 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.pulsar.broker.BrokerData;
 import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 
 /**
  * This class represents all data that could be relevant when making a load 
management decision.
@@ -59,6 +62,13 @@ public class LoadData {
         return bundleData;
     }
 
+    public Map<String, BundleData> getBundleDataForLoadShedding() {
+        return bundleData.entrySet().stream()
+                .filter(e -> !NamespaceService.isSystemServiceNamespace(
+                        NamespaceBundle.getBundleNamespace(e.getKey())))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
     public Map<String, Long> getRecentlyUnloadedBundles() {
         return recentlyUnloadedBundles;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index e9c827b..2016f3a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Map;
@@ -98,9 +97,8 @@ public class OverloadShedder implements LoadSheddingStrategy {
                 // Sort bundles by throughput, then pick the biggest N which 
combined
                 // make up for at least the minimum throughput to offload
 
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> 
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
-                            && localData.getBundles().contains(e.getKey()))
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
+                    .filter(e -> localData.getBundles().contains(e.getKey()))
                     .map((e) -> {
                         // Map to throughput value
                         // Consider short-term byte rate to address system 
resource burden
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 727be9b..cd233db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.loadbalance.impl;
 
-import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import java.util.HashMap;
@@ -92,8 +91,7 @@ public class ThresholdShedder implements LoadSheddingStrategy 
{
             MutableBoolean atLeastOneBundleSelected = new 
MutableBoolean(false);
 
             if (localData.getBundles().size() > 1) {
-                loadData.getBundleData().entrySet().stream()
-                    .filter(e -> 
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches())
+                loadData.getBundleDataForLoadShedding().entrySet().stream()
                     .map((e) -> {
                         String bundle = e.getKey();
                         BundleData bundleData = e.getValue();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 1a42c19..d2c87be 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1364,6 +1364,11 @@ public class NamespaceService implements AutoCloseable {
         }
     }
 
+    public static boolean isSystemServiceNamespace(String namespace) {
+        return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+    }
+
     public boolean registerSLANamespace() throws PulsarServerException {
         boolean isNameSpaceRegistered = 
registerNamespace(getSLAMonitorNamespace(host, config), false);
         if (isNameSpaceRegistered) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
index 1531095..98dcb93 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
@@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId, 
Comparable<NamespaceBundl
         return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
     }
 
+    public static String getBundleNamespace(String namespaceBundle) {
+        int index = namespaceBundle.lastIndexOf('/');
+        if (index != -1) {
+            try {
+                return NamespaceName.get(namespaceBundle.substring(0, 
index)).toString();
+            } catch (Exception e) {
+                // return itself if meets invalid format
+            }
+        }
+        return namespaceBundle;
+    }
+
     public NamespaceBundleFactory getNamespaceBundleFactory() {
         return factory;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index eeedef5..146f417 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.broker.namespace;
 
 import static 
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static 
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.junit.Assert.assertNotEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -509,6 +511,15 @@ public class NamespaceServiceTest extends BrokerTestBase {
         }
     }
 
+    @Test
+    public void testHeartbeatNamespaceMatch() throws Exception {
+        String namespaceNameString = 
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+        NamespaceName namespaceName = NamespaceName.get(namespaceNameString);
+        NamespaceBundle namespaceBundle = 
pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
+        assertTrue(NamespaceService.isSystemServiceNamespace(
+                        
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
+    }
+
     @SuppressWarnings("unchecked")
     private Pair<NamespaceBundles, List<NamespaceBundle>> 
splitBundles(NamespaceBundleFactory utilityFactory,
             NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle 
targetBundle) throws Exception {

Reply via email to