This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 418589b fix shedding heartbeat ns (#13208)
418589b is described below
commit 418589bd1464b6eb24ec29740ae7e7a268c323dc
Author: Zhanpeng Wu <[email protected]>
AuthorDate: Mon Dec 13 19:44:55 2021 +0800
fix shedding heartbeat ns (#13208)
Related to #12252
I found that the problem mentioned in #12252 has not been solved, because
the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what
actually provides is the full name of the bundle.
1. fix the parttern matching problem
2. add a test case for it
This change is already covered by existing tests.
(cherry picked from commit 78e3d8f7d872746db962be36ad3de49dac1ef015)
---
.../java/org/apache/pulsar/broker/loadbalance/LoadData.java | 10 ++++++++++
.../pulsar/broker/loadbalance/impl/OverloadShedder.java | 6 ++----
.../pulsar/broker/loadbalance/impl/ThresholdShedder.java | 4 +---
.../org/apache/pulsar/broker/namespace/NamespaceService.java | 5 +++++
.../org/apache/pulsar/common/naming/NamespaceBundle.java | 12 ++++++++++++
.../apache/pulsar/broker/namespace/NamespaceServiceTest.java | 11 +++++++++++
6 files changed, 41 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
index a469c5c..4243420 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.broker.loadbalance;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.NamespaceBundle;
/**
* This class represents all data that could be relevant when making a load
management decision.
@@ -59,6 +62,13 @@ public class LoadData {
return bundleData;
}
+ public Map<String, BundleData> getBundleDataForLoadShedding() {
+ return bundleData.entrySet().stream()
+ .filter(e -> !NamespaceService.isSystemServiceNamespace(
+ NamespaceBundle.getBundleNamespace(e.getKey())))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+
public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
index e9c827b..2016f3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/OverloadShedder.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
@@ -98,9 +97,8 @@ public class OverloadShedder implements LoadSheddingStrategy {
// Sort bundles by throughput, then pick the biggest N which
combined
// make up for at least the minimum throughput to offload
- loadData.getBundleData().entrySet().stream()
- .filter(e ->
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches()
- && localData.getBundles().contains(e.getKey()))
+ loadData.getBundleDataForLoadShedding().entrySet().stream()
+ .filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system
resource burden
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
index 727be9b..cd233db 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ThresholdShedder.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.loadbalance.impl;
-import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.HashMap;
@@ -92,8 +91,7 @@ public class ThresholdShedder implements LoadSheddingStrategy
{
MutableBoolean atLeastOneBundleSelected = new
MutableBoolean(false);
if (localData.getBundles().size() > 1) {
- loadData.getBundleData().entrySet().stream()
- .filter(e ->
!HEARTBEAT_NAMESPACE_PATTERN.matcher(e.getKey()).matches())
+ loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 1a42c19..d2c87be 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -1364,6 +1364,11 @@ public class NamespaceService implements AutoCloseable {
}
}
+ public static boolean isSystemServiceNamespace(String namespace) {
+ return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+ || SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
+ }
+
public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered =
registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
index 1531095..98dcb93 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundle.java
@@ -152,6 +152,18 @@ public class NamespaceBundle implements ServiceUnitId,
Comparable<NamespaceBundl
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}
+ public static String getBundleNamespace(String namespaceBundle) {
+ int index = namespaceBundle.lastIndexOf('/');
+ if (index != -1) {
+ try {
+ return NamespaceName.get(namespaceBundle.substring(0,
index)).toString();
+ } catch (Exception e) {
+ // return itself if meets invalid format
+ }
+ }
+ return namespaceBundle;
+ }
+
public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index eeedef5..146f417 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -19,7 +19,9 @@
package org.apache.pulsar.broker.namespace;
import static
org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
+import static
org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -509,6 +511,15 @@ public class NamespaceServiceTest extends BrokerTestBase {
}
}
+ @Test
+ public void testHeartbeatNamespaceMatch() throws Exception {
+ String namespaceNameString =
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf);
+ NamespaceName namespaceName = NamespaceName.get(namespaceNameString);
+ NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
+ assertTrue(NamespaceService.isSystemServiceNamespace(
+
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
+ }
+
@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>>
splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle
targetBundle) throws Exception {