This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0feaa451694 [improve][broker]add NamespacePolicies and AntiAffinity 
check before unload in checkNamespaceBundleSplit (#16780)
0feaa451694 is described below

commit 0feaa45169478af310615120f6b9c45faacdf670
Author: lixinyang <[email protected]>
AuthorDate: Wed Jan 11 19:08:02 2023 +0800

    [improve][broker]add NamespacePolicies and AntiAffinity check before unload 
in checkNamespaceBundleSplit (#16780)
    
    Co-authored-by: nicklixinyang <[email protected]>
---
 .../pulsar/broker/loadbalance/BundleSplitStrategy.java   |  6 +++---
 .../broker/loadbalance/impl/BundleSplitterTask.java      | 12 +++++-------
 .../broker/loadbalance/impl/ModularLoadManagerImpl.java  | 16 ++++++++++++----
 .../broker/loadbalance/impl/BundleSplitterTaskTest.java  |  5 ++---
 .../pulsar/client/api/BrokerServiceLookupTest.java       |  5 +++++
 5 files changed, 27 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
index d3d76b8e92d..484eee77f89 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/BundleSplitStrategy.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
-import java.util.Set;
+import java.util.Map;
 import org.apache.pulsar.broker.PulsarService;
 
 /**
@@ -33,7 +33,7 @@ public interface BundleSplitStrategy {
      *            leader broker).
      * @param pulsar
      *            Service to use.
-     * @return A set of the bundles that should be split.
+     * @return A map of the bundles that should be split and the brokers on 
which they reside.
      */
-    Set<String> findBundlesToSplit(LoadData loadData, PulsarService pulsar);
+    Map<String, String> findBundlesToSplit(LoadData loadData, PulsarService 
pulsar);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
index 310f80d6f74..7f3e43d3523 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTask.java
@@ -19,9 +19,7 @@
 package org.apache.pulsar.broker.loadbalance.impl;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BundleSplitStrategy;
@@ -38,7 +36,7 @@ import org.slf4j.LoggerFactory;
  */
 public class BundleSplitterTask implements BundleSplitStrategy {
     private static final Logger log = 
LoggerFactory.getLogger(BundleSplitStrategy.class);
-    private final Set<String> bundleCache;
+    private final Map<String, String> bundleCache;
 
     private final Map<String, Integer> namespaceBundleCount;
 
@@ -48,7 +46,7 @@ public class BundleSplitterTask implements 
BundleSplitStrategy {
      *
      */
     public BundleSplitterTask() {
-        bundleCache = new HashSet<>();
+        bundleCache = new HashMap<>();
         namespaceBundleCount = new HashMap<>();
     }
 
@@ -61,10 +59,10 @@ public class BundleSplitterTask implements 
BundleSplitStrategy {
      * @param pulsar
      *            Service to use.
      * @return All bundles who have exceeded configured thresholds in number 
of topics, number of sessions, total
-     *         message rates, or total throughput.
+     *         message rates, or total throughput and the brokers on which 
they reside.
      */
     @Override
-    public Set<String> findBundlesToSplit(final LoadData loadData, final 
PulsarService pulsar) {
+    public Map<String, String> findBundlesToSplit(final LoadData loadData, 
final PulsarService pulsar) {
         bundleCache.clear();
         namespaceBundleCount.clear();
         final ServiceConfiguration conf = pulsar.getConfiguration();
@@ -108,7 +106,7 @@ public class BundleSplitterTask implements 
BundleSplitStrategy {
                                     maxBundleSessions, totalMessageRate, 
maxBundleMsgRate,
                                     totalMessageThroughput / 
LoadManagerShared.MIBI,
                                     maxBundleBandwidth / 
LoadManagerShared.MIBI);
-                            bundleCache.add(bundle);
+                            bundleCache.put(bundle, broker);
                             int bundleNum = 
namespaceBundleCount.getOrDefault(namespace, 0);
                             namespaceBundleCount.put(namespace, bundleNum + 1);
                         } else {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 6e63643a859..e25ec981e77 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -748,10 +748,10 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         }
         final boolean unloadSplitBundles = 
pulsar.getConfiguration().isLoadBalancerAutoUnloadSplitBundlesEnabled();
         synchronized (bundleSplitStrategy) {
-            final Set<String> bundlesToBeSplit = 
bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
+            final Map<String, String> bundlesToBeSplit = 
bundleSplitStrategy.findBundlesToSplit(loadData, pulsar);
             NamespaceBundleFactory namespaceBundleFactory = 
pulsar.getNamespaceService().getNamespaceBundleFactory();
             int splitCount = 0;
-            for (String bundleName : bundlesToBeSplit) {
+            for (String bundleName : bundlesToBeSplit.keySet()) {
                 try {
                     final String namespaceName = 
LoadManagerShared.getNamespaceNameFromBundleName(bundleName);
                     final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundleName);
@@ -768,9 +768,17 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
                             
.invalidateBundleCache(NamespaceName.get(namespaceName));
                     deleteBundleDataFromMetadataStore(bundleName);
 
-                    log.info("Load-manager splitting bundle {} and unloading 
{}", bundleName, unloadSplitBundles);
+                    // Check NamespacePolicies and AntiAffinityNamespace 
support unload bundle.
+                    boolean isUnload = false;
+                    String broker = bundlesToBeSplit.get(bundleName);
+                    if (unloadSplitBundles
+                            && shouldNamespacePoliciesUnload(namespaceName, 
bundleRange, broker)
+                            && 
shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) {
+                        isUnload = true;
+                    }
+                    log.info("Load-manager splitting bundle {} and unloading 
{}", bundleName, isUnload);
                     
pulsar.getAdminClient().namespaces().splitNamespaceBundle(namespaceName, 
bundleRange,
-                        unloadSplitBundles, null);
+                            isUnload, null);
 
                     splitCount++;
                     log.info("Successfully split namespace bundle {}", 
bundleName);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
index fbefdb74d41..3173987a3c8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/BundleSplitterTaskTest.java
@@ -37,7 +37,6 @@ import org.testng.annotations.Test;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * @author hezhangjian
@@ -92,7 +91,7 @@ public class BundleSplitterTaskTest {
         bundleData.setLongTermData(averageMessageData);
         loadData.getBundleData().put("ten/ns/0x00000000_0x80000000", 
bundleData);
 
-        final Set<String> bundlesToSplit = 
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        final Map<String, String> bundlesToSplit = 
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
         Assert.assertEquals(bundlesToSplit.size(), 0);
     }
 
@@ -142,7 +141,7 @@ public class BundleSplitterTaskTest {
         loadData.getBundleData().put("ten/ns/0x40000000_0x60000000", 
bundleData3);
 
         int currentBundleCount = 
pulsar.getNamespaceService().getBundleCount(NamespaceName.get("ten/ns"));
-        final Set<String> bundlesToSplit = 
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
+        final Map<String, String> bundlesToSplit = 
bundleSplitterTask.findBundlesToSplit(loadData, pulsar);
         Assert.assertEquals(bundlesToSplit.size() + currentBundleCount,
                 
pulsar.getConfiguration().getLoadBalancerNamespaceMaximumBundles());
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
index 53da4407362..33f0d6ee05a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/BrokerServiceLookupTest.java
@@ -752,6 +752,11 @@ public class BrokerServiceLookupTest extends 
ProducerConsumerBase {
                 
assertNotEquals(pulsar2.getNamespaceService().getBundle(topicName), 
bundleInBroker2);
             });
 
+            // Unload the NamespacePolicies and AntiAffinity check.
+            String currentBroker = String.format("%s:%d", "localhost", 
pulsar.getListenPortHTTP().get());
+            
assertTrue(loadManager.shouldNamespacePoliciesUnload(namespace,"0x00000000_0xffffffff",
 currentBroker));
+            
assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(namespace,"0x00000000_0xffffffff",
 currentBroker));
+
             // (7) Make lookup request again to Broker-2 which should succeed.
             final String topic3 = "persistent://" + namespace + "/topic3";
             @Cleanup

Reply via email to