This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0feaa451694 [improve][broker]add NamespacePolicies and AntiAffinity
check before unload in checkNamespaceBundleSplit (#16780)
0feaa451694 is described below
commit 0feaa45169478af310615120f6b9c45faacdf670
Author: lixinyang <[email protected]>
AuthorDate: Wed Jan 11 19:08:02 2023 +0800
[improve][broker]add NamespacePolicies and AntiAffinity check before unload
in checkNamespaceBundleSplit (#16780)
Co-authored-by: nicklixinyang <[email protected]>
---
.../pulsar/broker/loadbalance/BundleSplitStrategy.java | 6 +++---
.../broker/loadbalance/impl/BundleSplitterTask.java | 12 +++++-------
.../broker/loadbalance/impl/ModularLoadManagerImpl.java | 16 ++++++++++++----
.../broker/loadbalance/impl/BundleSplitterTaskTest.java | 5 ++---
.../pulsar/client/api/BrokerServiceLookupTest.java | 5 +++++
5 files changed, 27 insertions(+), 17 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
index d3d76b8e92d..484eee77f89 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
-import java.util.Set;
+import java.util.Map;
import org.apache.pulsar.broker.PulsarService;
/**
@@ -33,7 +33,7 @@ public interface BundleSplitStrategy {
* leader broker).
* @param pulsar
* Service to use.
- * @return A set of the bundles that should be split.
+ * @return A map of the bundles that should be split and the brokers on
which they reside.
*/
- Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
+ Map<String, String> findBundlesToSplit(LoadData loadData, PulsarService
pulsar);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index 310f80d6f74..7f3e43d3523 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -19,9 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory;
*/
public class BundleSplitterTask implements BundleSplitStrategy {
private static final Logger log =
LoggerFactory.getLogger(BundleSplitStrategy.class);
- private final Set<String> bundleCache;
+ private final Map<String, String> bundleCache;
private final Map<String, Integer> namespaceBundleCount;
@@ -48,7 +46,7 @@ public class BundleSplitterTask implements
BundleSplitStrategy {
*
*/
public BundleSplitterTask() {
- bundleCache = new HashSet<>();
+ bundleCache = new HashMap<>();
namespaceBundleCount = new HashMap<>();
}
@@ -61,10 +59,10 @@ public class BundleSplitterTask implements
BundleSplitStrategy {
* @param pulsar
* Service to use.
* @return All bundles who have exceeded configured thresholds in number
of topics, number of sessions, total
- * message rates, or total throughput.
+ * message rates, or total throughput and the brokers on which
they reside.
*/
@Override
- public Set<String> findBundlesToSplit(final LoadData loadData, final
PulsarService pulsar) {
+ public Map<String, String> findBundlesToSplit(final LoadData loadData,
final PulsarService pulsar) {
bundleCache.clear();
namespaceBundleCount.clear();
final ServiceConfiguration conf = pulsar.getConfiguration();
@@ -108,7 +106,7 @@ public class BundleSplitterTask implements
BundleSplitStrategy {
maxBundleSessions, totalMessageRate,
maxBundleMsgRate,
totalMessageThroughput /
LoadManagerShared.MIBI,
maxBundleBandwidth /
LoadManagerShared.MIBI);
- bundleCache.add(bundle);
+ bundleCache.put(bundle, broker);
int bundleNum =
namespaceBundleCount.getOrDefault(namespace, 0);
namespaceBundleCount.put(namespace, bundleNum + 1);
} else {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 6e63643a859..e25ec981e77 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -748,10 +748,10 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
}
final boolean unloadSplitBundles =
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
synchronized (bundleSplitStrategy) {
- final Set<String> bundlesToBeSplit =
bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
+ final Map<String, String> bundlesToBeSplit =
bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
NamespaceBundleFactory namespaceBundleFactory =
pulsar.getNamespaceService().getNamespaceBundleFactory();
int splitCount = 0;
- for (String bundleName : bundlesToBeSplit) {
+ for (String bundleName : bundlesToBeSplit.keySet()) {
try {
final String namespaceName =
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
final String bundleRange =
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
@@ -768,9 +768,17 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
.invalidateBundleCache(NamespaceName.get(namespaceName));
deleteBundleDataFromMetadataStore(bundleName);
- log.info("Load-manager splitting bundle {} and unloading
{}", bundleName, unloadSplitBundles);
+ // Check NamespacePolicies and AntiAffinityNamespace
support unload bundle.
+ boolean isUnload = false;
+ String broker = bundlesToBeSplit.get(bundleName);
+ if (unloadSplitBundles
+ && shouldNamespacePoliciesUnload(namespaceName,
bundleRange, broker)
+ &&
shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
+ isUnload = true;
+ }
+ log.info("Load-manager splitting bundle {} and unloading
{}", bundleName, isUnload);
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName,
bundleRange,
- unloadSplitBundles, null);
+ isUnload, null);
splitCount++;
log.info("Successfully split namespace bundle {}",
bundleName);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index fbefdb74d41..3173987a3c8 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -37,7 +37,6 @@ import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
/**
* @author hezhangjian
@@ -92,7 +91,7 @@ public class BundleSplitterTaskTest {
bundleData.setLongTermData(averageMessageData);
loadData.getBundleData().put("ten/ns/0x00000000_0x80000000",
bundleData);
- final Set<String> bundlesToSplit =
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+ final Map<String, String> bundlesToSplit =
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
Assert.assertEquals(bundlesToSplit.size(), 0);
}
@@ -142,7 +141,7 @@ public class BundleSplitterTaskTest {
loadData.getBundleData().put("ten/ns/0x40000000_0x60000000",
bundleData3);
int currentBundleCount =
pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
- final Set<String> bundlesToSplit =
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+ final Map<String, String> bundlesToSplit =
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 53da4407362..33f0d6ee05a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -752,6 +752,11 @@ public class BrokerServiceLookupTest extends
ProducerConsumerBase {
assertNotEquals(pulsar2.getNamespaceService().getBundle(topicName),
bundleInBroker2);
});
+ // Unload the NamespacePolicies and AntiAffinity check.
+ String currentBroker = String.format("%s:%d", "localhost",
pulsar.getListenPortHTTP().get());
+
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,"0x00000000_0xffffffff",
currentBroker));
+
assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace,"0x00000000_0xffffffff",
currentBroker));
+
// (7) Make lookup request again to Broker-2 which should succeed.
final String topic3 = "persistent://" + namespace + "/topic3";
@Cleanup