This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b6a73823ce2 [improve][broker] PIP-192: Support broker isolation policy 
(#19592)
b6a73823ce2 is described below

commit b6a73823ce2ed47baa606b2ecc5f1569e4c101a5
Author: Kai Wang <[email protected]>
AuthorDate: Sat Mar 11 22:59:29 2023 +0800

    [improve][broker] PIP-192: Support broker isolation policy (#19592)
---
 .../extensions/ExtensibleLoadManagerImpl.java      |  10 +-
 .../extensions/filter/BrokerFilter.java            |  12 +-
 ...ter.java => BrokerIsolationPoliciesFilter.java} |  39 ++--
 .../filter/BrokerMaxTopicCountFilter.java          |   8 +
 .../extensions/filter/BrokerVersionFilter.java     |  11 +-
 .../policies/IsolationPoliciesHelper.java          |  68 ++++++
 .../extensions/policies/package-info.java          |  19 ++
 .../extensions/scheduler/TransferShedder.java      |  79 ++++++-
 .../extensions/ExtensibleLoadManagerImplTest.java  |  61 +++++-
 .../filter/BrokerIsolationPoliciesFilterTest.java  | 222 ++++++++++++++++++++
 .../filter/BrokerMaxTopicCountFilterTest.java      |   2 +-
 .../extensions/filter/BrokerVersionFilterTest.java |  13 +-
 .../extensions/scheduler/TransferShedderTest.java  | 227 +++++++++++++++++++--
 13 files changed, 715 insertions(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
index 2bebe203d87..82790f44fcb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java
@@ -42,6 +42,7 @@ import 
org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerFilter;
+import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerIsolationPoliciesFilter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerMaxTopicCountFilter;
 import 
org.apache.pulsar.broker.loadbalance.extensions.filter.BrokerVersionFilter;
 import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
@@ -139,6 +140,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     public ExtensibleLoadManagerImpl() {
         this.brokerFilterPipeline = new ArrayList<>();
         this.brokerFilterPipeline.add(new BrokerMaxTopicCountFilter());
+        this.brokerFilterPipeline.add(new BrokerIsolationPoliciesFilter());
         this.brokerFilterPipeline.add(new BrokerVersionFilter());
         // TODO: Make brokerSelectionStrategy configurable.
         this.brokerSelectionStrategy = new LeastResourceUsageWithWeight();
@@ -225,6 +227,7 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
     public void initialize(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.conf = pulsar.getConfiguration();
+        this.brokerFilterPipeline.forEach(brokerFilter -> 
brokerFilter.initialize(pulsar));
     }
 
     @Override
@@ -287,7 +290,6 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
         BrokerRegistry brokerRegistry = getBrokerRegistry();
         return brokerRegistry.getAvailableBrokerLookupDataAsync()
                 .thenCompose(availableBrokers -> {
-                    // TODO: Support isolation policies
                     LoadManagerContext context = this.getContext();
 
                     Map<String, BrokerLookupData> availableBrokerCandidates = 
new HashMap<>(availableBrokers);
@@ -296,11 +298,13 @@ public class ExtensibleLoadManagerImpl implements 
ExtensibleLoadManager {
                     List<BrokerFilter> filterPipeline = 
getBrokerFilterPipeline();
                     for (final BrokerFilter filter : filterPipeline) {
                         try {
-                            filter.filter(availableBrokerCandidates, context);
+                            filter.filter(availableBrokerCandidates, bundle, 
context);
+                            // Preserve the filter successes result.
+                            
availableBrokers.keySet().retainAll(availableBrokerCandidates.keySet());
                         } catch (BrokerFilterException e) {
                             // TODO: We may need to revisit this error case.
                             log.error("Failed to filter out brokers.", e);
-                            availableBrokerCandidates = availableBrokers;
+                            availableBrokerCandidates = new 
HashMap<>(availableBrokers);
                         }
                     }
                     if (availableBrokerCandidates.isEmpty()) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
index 35f4b6817f1..30d25f559b1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilter.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.broker.loadbalance.extensions.filter;
 
 import java.util.Map;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 
 /**
  * Filter out unqualified Brokers, which are not entered into LoadBalancer for 
decision-making.
@@ -33,14 +35,22 @@ public interface BrokerFilter {
      */
     String name();
 
+    /**
+     * Initialize this broker filter using the given pulsar service.
+     */
+    void initialize(PulsarService pulsar);
+
     /**
      * Filter out unqualified brokers based on implementation.
      *
      * @param brokers The full broker and lookup data.
+     * @param serviceUnit The current serviceUnit.
      * @param context The load manager context.
      * @return Filtered broker list.
      */
-    Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers, LoadManagerContext context)
+    Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> brokers,
+                                         ServiceUnitId serviceUnit,
+                                         LoadManagerContext context)
             throws BrokerFilterException;
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
similarity index 54%
copy from 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
copy to 
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
index e3f8faca324..b28c77f76f3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilter.java
@@ -19,15 +19,23 @@
 package org.apache.pulsar.broker.loadbalance.extensions.filter;
 
 import java.util.Map;
-import java.util.Optional;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
-import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import 
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
+import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 
-public class BrokerMaxTopicCountFilter implements BrokerFilter {
 
-    public static final String FILTER_NAME = "broker_max_topic_count_filter";
+@Slf4j
+public class BrokerIsolationPoliciesFilter implements BrokerFilter {
+
+    public static final String FILTER_NAME = 
"broker_isolation_policies_filter";
+
+    private IsolationPoliciesHelper isolationPoliciesHelper;
 
     @Override
     public String name() {
@@ -35,15 +43,18 @@ public class BrokerMaxTopicCountFilter implements 
BrokerFilter {
     }
 
     @Override
-    public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers,
-                                                LoadManagerContext context) 
throws BrokerFilterException {
-        int loadBalancerBrokerMaxTopics = 
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
-        brokers.keySet().removeIf(broker -> {
-            Optional<BrokerLoadData> brokerLoadDataOpt = 
context.brokerLoadDataStore().get(broker);
-            long topics = 
brokerLoadDataOpt.map(BrokerLoadData::getTopics).orElse(0);
-            // TODO: The broker load data might be delayed, so the max topic 
check might not accurate.
-            return topics >= loadBalancerBrokerMaxTopics;
-        });
-        return brokers;
+    public void initialize(PulsarService pulsar) {
+        this.isolationPoliciesHelper = new IsolationPoliciesHelper(new 
SimpleResourceAllocationPolicies(pulsar));
+    }
+
+    @Override
+    public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
availableBrokers,
+                                                ServiceUnitId serviceUnit,
+                                                LoadManagerContext context)
+            throws BrokerFilterException {
+        Set<String> brokerCandidateCache =
+                
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, serviceUnit);
+        availableBrokers.keySet().retainAll(brokerCandidateCache);
+        return availableBrokers;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
index e3f8faca324..b98edd3d425 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilter.java
@@ -20,10 +20,12 @@ package 
org.apache.pulsar.broker.loadbalance.extensions.filter;
 
 import java.util.Map;
 import java.util.Optional;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 
 public class BrokerMaxTopicCountFilter implements BrokerFilter {
 
@@ -34,8 +36,14 @@ public class BrokerMaxTopicCountFilter implements 
BrokerFilter {
         return FILTER_NAME;
     }
 
+    @Override
+    public void initialize(PulsarService pulsar) {
+        // No-op
+    }
+
     @Override
     public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers,
+                                                ServiceUnitId serviceUnit,
                                                 LoadManagerContext context) 
throws BrokerFilterException {
         int loadBalancerBrokerMaxTopics = 
context.brokerConfiguration().getLoadBalancerBrokerMaxTopics();
         brokers.keySet().removeIf(broker -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
index 869fb049a3c..b7332a5ff10 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilter.java
@@ -22,11 +22,13 @@ import com.github.zafarkhaja.semver.Version;
 import java.util.Iterator;
 import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterBadVersionException;
 import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 
 /**
  * Filter by broker version.
@@ -45,7 +47,9 @@ public class BrokerVersionFilter implements BrokerFilter {
      *
      */
     @Override
-    public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers, LoadManagerContext context)
+    public Map<String, BrokerLookupData> filter(Map<String, BrokerLookupData> 
brokers,
+                                                ServiceUnitId serviceUnit,
+                                                LoadManagerContext context)
             throws BrokerFilterException {
         ServiceConfiguration conf = context.brokerConfiguration();
         if (!conf.isPreferLaterVersions() || brokers.isEmpty()) {
@@ -144,4 +148,9 @@ public class BrokerVersionFilter implements BrokerFilter {
     public String name() {
         return FILTER_NAME;
     }
+
+    @Override
+    public void initialize(PulsarService pulsar) {
+        // No-op
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
new file mode 100644
index 00000000000..4d7a5bf22d6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/IsolationPoliciesHelper.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
+import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.ServiceUnitId;
+
+@Slf4j
+public class IsolationPoliciesHelper {
+
+    private final SimpleResourceAllocationPolicies policies;
+
+    public IsolationPoliciesHelper(SimpleResourceAllocationPolicies policies) {
+        this.policies = policies;
+    }
+
+    private static final FastThreadLocal<Set<String>> 
localBrokerCandidateCache = new FastThreadLocal<>() {
+        @Override
+        protected Set<String> initialValue() {
+            return new HashSet<>();
+        }
+    };
+
+    public Set<String> applyIsolationPolicies(Map<String, BrokerLookupData> 
availableBrokers,
+                                              ServiceUnitId serviceUnit) {
+        Set<String> brokerCandidateCache = localBrokerCandidateCache.get();
+        brokerCandidateCache.clear();
+        LoadManagerShared.applyNamespacePolicies(serviceUnit, policies, 
brokerCandidateCache,
+                availableBrokers.keySet(), new 
LoadManagerShared.BrokerTopicLoadingPredicate() {
+                    @Override
+                    public boolean isEnablePersistentTopics(String brokerUrl) {
+                        BrokerLookupData lookupData = 
availableBrokers.get(brokerUrl.replace("http://";, ""));
+                        return lookupData != null && 
lookupData.persistentTopicsEnabled();
+                    }
+
+                    @Override
+                    public boolean isEnableNonPersistentTopics(String 
brokerUrl) {
+                        BrokerLookupData lookupData = 
availableBrokers.get(brokerUrl.replace("http://";, ""));
+                        return lookupData != null && 
lookupData.nonPersistentTopicsEnabled();
+                    }
+                });
+        return brokerCandidateCache;
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
new file mode 100644
index 00000000000..76e0c4a8942
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/policies/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.policies;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 7f9128de817..810fda320af 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -21,10 +21,15 @@ package 
org.apache.pulsar.broker.loadbalance.extensions.scheduler;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.CoolDown;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.NoBrokers;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.MinMaxPriorityQueue;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import lombok.Getter;
 import lombok.experimental.Accessors;
 import org.apache.commons.lang3.StringUtils;
@@ -32,12 +37,15 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import 
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.slf4j.Logger;
@@ -71,6 +79,7 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
     private final LoadStats stats = new LoadStats();
     private final PulsarService pulsar;
     private final SimpleResourceAllocationPolicies allocationPolicies;
+    private final IsolationPoliciesHelper isolationPoliciesHelper;
 
     private final UnloadDecision decision = new UnloadDecision();
 
@@ -78,11 +87,13 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
     public TransferShedder(){
         this.pulsar = null;
         this.allocationPolicies = null;
+        this.isolationPoliciesHelper = null;
     }
 
     public TransferShedder(PulsarService pulsar){
         this.pulsar = pulsar;
         this.allocationPolicies = new SimpleResourceAllocationPolicies(pulsar);
+        this.isolationPoliciesHelper = new 
IsolationPoliciesHelper(allocationPolicies);
     }
 
 
@@ -265,6 +276,17 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
 
             final double targetStd = conf.getLoadBalancerBrokerLoadTargetStd();
             boolean transfer = conf.isLoadBalancerTransferEnabled();
+
+            Map<String, BrokerLookupData> availableBrokers;
+            try {
+                availableBrokers = 
context.brokerRegistry().getAvailableBrokerLookupDataAsync()
+                        
.get(context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
+            } catch (ExecutionException | InterruptedException | 
TimeoutException e) {
+                decision.skip(Unknown);
+                log.warn("Failed to fetch available brokers. Reason:{}. Stop 
unloading.", decision.getReason(), e);
+                return decision;
+            }
+
             while (true) {
                 if (!stats.hasTransferableBrokers()) {
                     if (debugMode) {
@@ -334,7 +356,9 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                     int remainingTopBundles = topBundlesLoadData.size();
                     for (var e : topBundlesLoadData) {
                         String bundle = e.bundleName();
-                        if (!recentlyUnloadedBundles.containsKey(bundle) && 
isTransferable(bundle)) {
+                        if (!recentlyUnloadedBundles.containsKey(bundle)
+                                && isTransferable(context, availableBrokers,
+                                bundle, maxBroker, Optional.of(minBroker))) {
                             var bundleData = e.stats();
                             double throughput = bundleData.msgThroughputIn + 
bundleData.msgThroughputOut;
                             if (remainingTopBundles > 1
@@ -342,8 +366,7 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
                                     || !atLeastOneBundleSelected)) {
                                 if (transfer) {
                                     selectedBundlesCache.put(maxBroker,
-                                            new Unload(maxBroker, bundle,
-                                                    Optional.of(minBroker)));
+                                            new Unload(maxBroker, bundle, 
Optional.of(minBroker)));
                                 } else {
                                     selectedBundlesCache.put(maxBroker,
                                             new Unload(maxBroker, bundle));
@@ -412,18 +435,27 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
     }
 
 
-    private boolean isTransferable(String bundle) {
+    private boolean isTransferable(LoadManagerContext context,
+                                   Map<String, BrokerLookupData> 
availableBrokers,
+                                   String bundle,
+                                   String maxBroker,
+                                   Optional<String> broker) {
         if (pulsar == null || allocationPolicies == null) {
             return true;
         }
-        NamespaceName namespace = 
NamespaceName.get(LoadManagerShared.getNamespaceNameFromBundleName(bundle));
-        if (allocationPolicies.areIsolationPoliciesPresent(namespace)) {
+        String namespace = 
LoadManagerShared.getNamespaceNameFromBundleName(bundle);
+        NamespaceName namespaceName = NamespaceName.get(namespace);
+        final String bundleRange = 
LoadManagerShared.getBundleRangeFromBundleName(bundle);
+        NamespaceBundle namespaceBundle =
+                
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, 
bundleRange);
+
+        if (!canTransferWithIsolationPoliciesToBroker(context, 
availableBrokers, namespaceBundle, maxBroker, broker)) {
             return false;
         }
 
         try {
             var localPoliciesOptional = pulsar
-                    
.getPulsarResources().getLocalPolicies().getLocalPolicies(namespace);
+                    
.getPulsarResources().getLocalPolicies().getLocalPolicies(namespaceName);
             if (localPoliciesOptional.isPresent() && StringUtils.isNotBlank(
                     localPoliciesOptional.get().namespaceAntiAffinityGroup)) {
                 return false;
@@ -434,4 +466,37 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with 
isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param availableBrokers The available brokers.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param currentBroker The current broker.
+     * @param targetBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean 
canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             Map<String, 
BrokerLookupData> availableBrokers,
+                                                             NamespaceBundle 
namespaceBundle,
+                                                             String 
currentBroker,
+                                                             Optional<String> 
targetBroker) {
+        if (isolationPoliciesHelper == null
+                || 
!allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject()))
 {
+            return true;
+        }
+        boolean transfer = 
context.brokerConfiguration().isLoadBalancerTransferEnabled();
+        Set<String> candidates = 
isolationPoliciesHelper.applyIsolationPolicies(availableBrokers, 
namespaceBundle);
+
+        // Remove the current bundle owner broker.
+        candidates.remove(currentBroker);
+
+        // Unload: Check if there are any more candidates available for 
selection.
+        if (targetBroker.isEmpty() || !transfer) {
+            return !candidates.isEmpty();
+        }
+        // Transfer: Check if this broker is among the candidates.
+        return candidates.contains(targetBroker.get());
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
index ec82f5c383e..441415a9d35 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
@@ -90,6 +90,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.TableViewImpl;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.ServiceUnitId;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -290,12 +291,19 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
                 return "Mock broker filter";
             }
 
+            @Override
+            public void initialize(PulsarService pulsar) {
+                // No-op
+            }
+
             @Override
             public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
-                                                        LoadManagerContext 
context) {
+                                                        ServiceUnitId 
serviceUnit,
+                                                        LoadManagerContext 
context) throws BrokerFilterException {
                 brokers.remove(pulsar1.getLookupServiceAddress());
                 return brokers;
             }
+
         })).when(primaryLoadManager).getBrokerFilterPipeline();
 
         Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
@@ -308,14 +316,10 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         TopicName topicName = TopicName.get("test-filter-has-exception");
         NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
 
-        doReturn(List.of(new BrokerFilter() {
-            @Override
-            public String name() {
-                return "Mock broker filter";
-            }
-
+        doReturn(List.of(new MockBrokerFilter() {
             @Override
             public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
+                                                        ServiceUnitId 
serviceUnit,
                                                         LoadManagerContext 
context) throws BrokerFilterException {
                 brokers.clear();
                 throw new BrokerFilterException("Test");
@@ -379,6 +383,35 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
     }
 
 
+    @Test
+    public void testMoreThenOneFilter() throws Exception {
+        TopicName topicName = TopicName.get("test-filter-has-exception");
+        NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
+
+        String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
+        doReturn(List.of(new MockBrokerFilter() {
+            @Override
+            public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
+                                                        ServiceUnitId 
serviceUnit,
+                                                        LoadManagerContext 
context) throws BrokerFilterException {
+                brokers.remove(lookupServiceAddress1);
+                return brokers;
+            }
+        },new MockBrokerFilter() {
+            @Override
+            public Map<String, BrokerLookupData> filter(Map<String, 
BrokerLookupData> brokers,
+                                                        ServiceUnitId 
serviceUnit,
+                                                        LoadManagerContext 
context) throws BrokerFilterException {
+                brokers.clear();
+                throw new BrokerFilterException("Test");
+            }
+        })).when(primaryLoadManager).getBrokerFilterPipeline();
+
+        Optional<BrokerLookupData> brokerLookupData = 
primaryLoadManager.assign(Optional.empty(), bundle).get();
+        assertTrue(brokerLookupData.isPresent());
+        assertEquals(brokerLookupData.get().getWebServiceUrl(), 
pulsar2.getWebServiceAddress());
+    }
+
     @Test
     public void testGetMetrics() throws Exception {
         {
@@ -565,6 +598,20 @@ public class ExtensibleLoadManagerImplTest extends 
MockedPulsarServiceBaseTest {
         assertEquals(actual, expected);
     }
 
+    private static abstract class MockBrokerFilter implements BrokerFilter {
+
+        @Override
+        public String name() {
+            return "Mock-broker-filter";
+        }
+
+        @Override
+        public void initialize(PulsarService pulsar) {
+            // No-op
+        }
+
+    }
+
     private static void cleanTableView(ServiceUnitStateChannel channel)
             throws IllegalAccessException {
         var tv = (TableViewImpl<ServiceUnitStateData>)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
new file mode 100644
index 00000000000..a079a23bcea
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerIsolationPoliciesFilterTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance.extensions.filter;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
+import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
+import 
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
+import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test for {@link BrokerIsolationPoliciesFilter}.
+ */
+@Test(groups = "broker")
+public class BrokerIsolationPoliciesFilterTest {
+
+    /**
+     * It verifies namespace-isolation policies with primary and secondary 
brokers.
+     *
+     * usecase:
+     *
+     * <pre>
+     *  1. Namespace: primary=broker1, secondary=broker2, shared=broker3, 
min_limit = 1
+     *     a. available-brokers: broker1, broker2, broker3 => result: broker1
+     *     b. available-brokers: broker2, broker3          => result: broker2
+     *     c. available-brokers: broker3                   => result: NULL
+     *  2. Namespace: primary=broker1, secondary=broker2, shared=broker3, 
min_limit = 2
+     *     a. available-brokers: broker1, broker2, broker3 => result: broker1, 
broker2
+     *     b. available-brokers: broker2, broker3          => result: broker2
+     *     c. available-brokers: broker3                   => result: NULL
+     * </pre>
+     */
+    @Test
+    public void 
testFilterWithNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers()
+            throws IllegalAccessException, BrokerFilterException {
+        var namespace = "my-tenant/my-ns";
+        NamespaceName namespaceName = NamespaceName.get(namespace);
+
+        BrokerIsolationPoliciesFilter filter = new 
BrokerIsolationPoliciesFilter();
+        var policies = mock(SimpleResourceAllocationPolicies.class);
+
+        // 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, 
min_limit = 1
+        setIsolationPolicies(policies, namespaceName, Set.of("broker1"), 
Set.of("broker2"), Set.of("broker3"), 1);
+        IsolationPoliciesHelper isolationPoliciesHelper = new 
IsolationPoliciesHelper(policies);
+        FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", 
isolationPoliciesHelper, true);
+
+        // a. available-brokers: broker1, broker2, broker3 => result: broker1
+        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertEquals(result.keySet(), Set.of("broker1"));
+
+        // b. available-brokers: broker2, broker3          => result: broker2
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertEquals(result.keySet(), Set.of("broker2"));
+
+        // c. available-brokers: broker3                   => result: NULL
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertTrue(result.isEmpty());
+
+        // 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, 
min_limit = 2
+        setIsolationPolicies(policies, namespaceName, Set.of("broker1"), 
Set.of("broker2"), Set.of("broker3"), 2);
+
+        // a. available-brokers: broker1, broker2, broker3 => result: broker1, 
broker2
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertEquals(result.keySet(), Set.of("broker1", "broker2"));
+
+        // b. available-brokers: broker2, broker3          => result: broker2
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertEquals(result.keySet(), Set.of("broker2"));
+
+        // c. available-brokers: broker3                   => result: NULL
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker3", getLookupData())), namespaceName, getContext());
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testFilterWithPersistentOrNonPersistentDisabled()
+            throws IllegalAccessException, BrokerFilterException {
+        var namespace = "my-tenant/my-ns";
+        NamespaceName namespaceName = NamespaceName.get(namespace);
+        NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
+        doReturn(true).when(namespaceBundle).hasNonPersistentTopic();
+        doReturn(namespaceName).when(namespaceBundle).getNamespaceObject();
+
+        BrokerIsolationPoliciesFilter filter = new 
BrokerIsolationPoliciesFilter();
+
+        var policies = mock(SimpleResourceAllocationPolicies.class);
+        
doReturn(false).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+        doReturn(true).when(policies).isSharedBroker(any());
+        IsolationPoliciesHelper isolationPoliciesHelper = new 
IsolationPoliciesHelper(policies);
+        FieldUtils.writeDeclaredField(filter, "isolationPoliciesHelper", 
isolationPoliciesHelper, true);
+
+        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceBundle, getContext());
+        assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+
+
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker1", getLookupData(true, false),
+                "broker2", getLookupData(true, false),
+                "broker3", getLookupData())), namespaceBundle, getContext());
+        assertEquals(result.keySet(), Set.of("broker3"));
+
+        doReturn(false).when(namespaceBundle).hasNonPersistentTopic();
+
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceBundle, getContext());
+        assertEquals(result.keySet(), Set.of("broker1", "broker2", "broker3"));
+
+        result = filter.filter(new HashMap<>(Map.of(
+                "broker1", getLookupData(false, true),
+                "broker2", getLookupData(),
+                "broker3", getLookupData())), namespaceBundle, getContext());
+        assertEquals(result.keySet(), Set.of("broker2", "broker3"));
+    }
+
+    private void setIsolationPolicies(SimpleResourceAllocationPolicies 
policies,
+                                      NamespaceName namespaceName,
+                                      Set<String> primary,
+                                      Set<String> secondary,
+                                      Set<String> shared,
+                                      int min_limit) {
+        reset(policies);
+        
doReturn(true).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+        doReturn(false).when(policies).isPrimaryBroker(eq(namespaceName), 
any());
+        doReturn(false).when(policies).isSecondaryBroker(eq(namespaceName), 
any());
+        doReturn(false).when(policies).isSharedBroker(any());
+
+        primary.forEach(broker -> {
+            doReturn(true).when(policies).isPrimaryBroker(eq(namespaceName), 
eq(broker));
+        });
+
+        secondary.forEach(broker -> {
+            doReturn(true).when(policies).isSecondaryBroker(eq(namespaceName), 
eq(broker));
+        });
+
+        shared.forEach(broker -> {
+            doReturn(true).when(policies).isSharedBroker(eq(broker));
+        });
+
+        doAnswer(invocationOnMock -> {
+            Integer totalPrimaryCandidates = invocationOnMock.getArgument(1, 
Integer.class);
+            return totalPrimaryCandidates < min_limit;
+        }).when(policies).shouldFailoverToSecondaries(eq(namespaceName), 
anyInt());
+    }
+
+    public BrokerLookupData getLookupData() {
+        return getLookupData(true, true);
+    }
+
+    public BrokerLookupData getLookupData(boolean persistentTopicsEnabled,
+                                          boolean nonPersistentTopicsEnabled) {
+        String webServiceUrl = "http://localhost:8080";;
+        String webServiceUrlTls = "https://localhoss:8081";;
+        String pulsarServiceUrl = "pulsar://localhost:6650";
+        String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        Map<String, String> protocols = new HashMap<>(){{
+            put("kafka", "9092");
+        }};
+        return new BrokerLookupData(
+                webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+                pulsarServiceUrlTls, advertisedListeners, protocols,
+                persistentTopicsEnabled, nonPersistentTopicsEnabled, "3.0.0");
+    }
+
+    public LoadManagerContext getContext() {
+        LoadManagerContext mockContext = mock(LoadManagerContext.class);
+        doReturn(new 
ServiceConfiguration()).when(mockContext).brokerConfiguration();
+        return mockContext;
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
index 4c3255341b7..da13a9526a8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerMaxTopicCountFilterTest.java
@@ -58,7 +58,7 @@ public class BrokerMaxTopicCountFilterTest extends 
BrokerFilterTestBase {
                 "broker3", getLookupData(),
                 "broker4", getLookupData()
         );
-        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(originalBrokers), context);
+        Map<String, BrokerLookupData> result = filter.filter(new 
HashMap<>(originalBrokers), null, context);
         assertEquals(result, Map.of(
                 "broker2", getLookupData(),
                 "broker4", getLookupData()
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
index d36c79d60ed..cafd8f0ea7a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerVersionFilterTest.java
@@ -41,7 +41,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
     @Test
     public void testFilterEmptyBrokerList() throws BrokerFilterException {
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(), getContext());
+        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(), null, getContext());
         assertTrue(result.isEmpty());
     }
 
@@ -58,7 +58,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
         );
         Map<String, BrokerLookupData> brokers = new HashMap<>(originalBrokers);
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filter(brokers, context);
+        Map<String, BrokerLookupData> result = 
brokerVersionFilter.filter(brokers, null, context);
         assertEquals(result, originalBrokers);
     }
 
@@ -71,7 +71,8 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6653", getLookupData("2.10.1")
         );
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(new 
HashMap<>(originalBrokers), getContext());
+        Map<String, BrokerLookupData> result = brokerVersionFilter.filter(
+                new HashMap<>(originalBrokers), null, getContext());
         assertEquals(result, Map.of(
                 "localhost:6651", getLookupData("2.10.1"),
                 "localhost:6652", getLookupData("2.10.1"),
@@ -84,7 +85,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6652", getLookupData("2.10.1"),
                 "localhost:6653", getLookupData("2.10.1")
         );
-        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
null, getContext());
 
         assertEquals(result, Map.of(
                 "localhost:6652", getLookupData("2.10.1"),
@@ -98,7 +99,7 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
         );
 
-        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+        result = brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
null, getContext());
         assertEquals(result, Map.of(
                 "localhost:6653", getLookupData("2.10.2-SNAPSHOT")
         ));
@@ -111,6 +112,6 @@ public class BrokerVersionFilterTest extends 
BrokerFilterTestBase {
                 "localhost:6650", getLookupData("xxx")
         );
         BrokerVersionFilter brokerVersionFilter = new BrokerVersionFilter();
-        brokerVersionFilter.filter(new HashMap<>(originalBrokers), 
getContext());
+        brokerVersionFilter.filter(new HashMap<>(originalBrokers), null, 
getContext());
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
index 709a1113f35..a668d85f007 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java
@@ -28,9 +28,15 @@ import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecis
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.OutDatedData;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Overloaded;
 import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Underloaded;
+import static 
org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision.Reason.Unknown;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -42,24 +48,37 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
 import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.commons.math3.stat.descriptive.moment.Mean;
 import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.loadbalance.extensions.BrokerRegistry;
 import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
 import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
+import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
 import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
 import org.apache.pulsar.broker.loadbalance.extensions.models.TopKBundles;
 import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
 import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
+import 
org.apache.pulsar.broker.loadbalance.extensions.policies.IsolationPoliciesHelper;
 import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
 import 
org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.LocalPoliciesResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.NamespaceBundleFactory;
+import org.apache.pulsar.common.naming.NamespaceBundles;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
@@ -145,6 +164,19 @@ public class TransferShedderTest {
         return topKBundles.getLoadData();
     }
 
+    public TopBundlesLoadData getTopBundlesLoadWithOutSuffix(String namespace,
+                                                             int load1,
+                                                             int load2) {
+        var namespaceBundleStats1 = new NamespaceBundleStats();
+        namespaceBundleStats1.msgThroughputOut = load1 * 1e6;
+        var namespaceBundleStats2 = new NamespaceBundleStats();
+        namespaceBundleStats2.msgThroughputOut = load2 * 1e6;
+        var topKBundles = new TopKBundles();
+        topKBundles.update(Map.of(namespace + "/0x00000000_0x7FFFFFF", 
namespaceBundleStats1,
+                namespace + "/0x7FFFFFF_0xFFFFFFF", namespaceBundleStats2), 2);
+        return topKBundles.getLoadData();
+    }
+
     public LoadManagerContext getContext(){
         var ctx = mock(LoadManagerContext.class);
         var conf = new ServiceConfiguration();
@@ -236,6 +268,10 @@ public class TransferShedderTest {
         doReturn(conf).when(ctx).brokerConfiguration();
         doReturn(brokerLoadDataStore).when(ctx).brokerLoadDataStore();
         doReturn(topBundleLoadDataStore).when(ctx).topBundleLoadDataStore();
+        var brokerRegister = mock(BrokerRegistry.class);
+        doReturn(brokerRegister).when(ctx).brokerRegistry();
+        BrokerRegistry registry = ctx.brokerRegistry();
+        
doReturn(CompletableFuture.completedFuture(Map.of())).when(registry).getAvailableBrokerLookupDataAsync();
         return ctx;
     }
 
@@ -350,33 +386,187 @@ public class TransferShedderTest {
     }
 
     @Test
-    public void testBundlesWithIsolationPolicies() throws 
IllegalAccessException {
-        var pulsar = mock(PulsarService.class);
+    public void testGetAvailableBrokersFailed() {
+        TransferShedder transferShedder = new TransferShedder();
+        var ctx = setupContext();
+        BrokerRegistry registry = ctx.brokerRegistry();
+        doReturn(FutureUtil.failedFuture(new 
TimeoutException())).when(registry).getAvailableBrokerLookupDataAsync();
+        var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), 
Map.of());
+
+        var expected = new UnloadDecision();
+        expected.setLabel(Skip);
+        expected.skip(Unknown);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+    }
+
+    @Test(timeOut = 30 * 1000)
+    public void testBundlesWithIsolationPolicies() throws 
IllegalAccessException, MetadataStoreException {
+        var pulsar = getMockPulsar();
+
         TransferShedder transferShedder = new TransferShedder(pulsar);
+
+        var pulsarResourcesMock = mock(PulsarResources.class);
+        var localPoliciesResourcesMock = mock(LocalPoliciesResources.class);
+        doReturn(pulsarResourcesMock).when(pulsar).getPulsarResources();
+        
doReturn(localPoliciesResourcesMock).when(pulsarResourcesMock).getLocalPolicies();
+        
doReturn(Optional.empty()).when(localPoliciesResourcesMock).getLocalPolicies(any());
+
         var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
                 spy(FieldUtils.readDeclaredField(transferShedder, 
"allocationPolicies", true));
-        
doReturn(true).when(allocationPoliciesSpy).areIsolationPoliciesPresent(any());
         FieldUtils.writeDeclaredField(transferShedder, "allocationPolicies", 
allocationPoliciesSpy, true);
+        IsolationPoliciesHelper isolationPoliciesHelper = new 
IsolationPoliciesHelper(allocationPoliciesSpy);
+        FieldUtils.writeDeclaredField(transferShedder, 
"isolationPoliciesHelper", isolationPoliciesHelper, true);
+
+        // Test transfer to a has isolation policies broker.
+        setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
+                Set.of("broker5"), Set.of(), Set.of(), 1);
         var ctx = setupContext();
+        BrokerRegistry registry = ctx.brokerRegistry();
+        doReturn(CompletableFuture.completedFuture(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData(),
+                "broker4", getLookupData(),
+                "broker5", 
getLookupData()))).when(registry).getAvailableBrokerLookupDataAsync();
+
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", 
getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
-        topBundlesLoadDataStore.pushAsync("broker2", 
getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
-        topBundlesLoadDataStore.pushAsync("broker3", 
getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
-        topBundlesLoadDataStore.pushAsync("broker4", 
getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
-        topBundlesLoadDataStore.pushAsync("broker5", 
getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+        topBundlesLoadDataStore.pushAsync("broker1",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1, 
3));
+        topBundlesLoadDataStore.pushAsync("broker2",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2, 
8));
+        topBundlesLoadDataStore.pushAsync("broker3",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6, 
10));
+        topBundlesLoadDataStore.pushAsync("broker4",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10, 
20));
+        topBundlesLoadDataStore.pushAsync("broker5",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70, 
20));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), 
Map.of());
 
         var expected = new UnloadDecision();
-        expected.setLabel(Skip);
-        expected.skip(NoBundles);
+        var unloads = expected.getUnloads();
+        unloads.put("broker4",
+                new Unload("broker4", 
"my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.of("broker2")));
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
+        expected.setLoadAvg(setupLoadAvg);
+        expected.setLoadStd(setupLoadStd);
+        assertEquals(res, expected);
+
+        // Test unload a has isolation policies broker.
+
+        setIsolationPolicies(allocationPoliciesSpy, "my-tenant/my-namespaceE",
+                Set.of("broker5"), Set.of(), Set.of(), 1);
+        ctx = setupContext();
+        registry = ctx.brokerRegistry();
+        doReturn(CompletableFuture.completedFuture(Map.of(
+                "broker1", getLookupData(),
+                "broker2", getLookupData(),
+                "broker3", getLookupData(),
+                "broker4", getLookupData(),
+                "broker5", 
getLookupData()))).when(registry).getAvailableBrokerLookupDataAsync();
+
+        ctx.brokerConfiguration().setLoadBalancerTransferEnabled(false);
+
+        topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
+        topBundlesLoadDataStore.pushAsync("broker1",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1, 
3));
+        topBundlesLoadDataStore.pushAsync("broker2",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2, 
8));
+        topBundlesLoadDataStore.pushAsync("broker3",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6, 
10));
+        topBundlesLoadDataStore.pushAsync("broker4",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10, 
20));
+        topBundlesLoadDataStore.pushAsync("broker5",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70, 
20));
+        res = transferShedder.findBundlesForUnloading(ctx, Map.of(), Map.of());
+
+        expected = new UnloadDecision();
+        unloads = expected.getUnloads();
+        unloads.put("broker4",
+                new Unload("broker4", 
"my-tenant/my-namespaceD/0x7FFFFFF_0xFFFFFFF", Optional.empty()));
+        expected.setLabel(Success);
+        expected.setReason(Overloaded);
         expected.setLoadAvg(setupLoadAvg);
         expected.setLoadStd(setupLoadStd);
         assertEquals(res, expected);
     }
 
+    public BrokerLookupData getLookupData() {
+        String webServiceUrl = "http://localhost:8080";;
+        String webServiceUrlTls = "https://localhoss:8081";;
+        String pulsarServiceUrl = "pulsar://localhost:6650";
+        String pulsarServiceUrlTls = "pulsar+ssl://localhost:6651";
+        Map<String, AdvertisedListener> advertisedListeners = new HashMap<>();
+        Map<String, String> protocols = new HashMap<>(){{
+            put("kafka", "9092");
+        }};
+        return new BrokerLookupData(
+                webServiceUrl, webServiceUrlTls, pulsarServiceUrl,
+                pulsarServiceUrlTls, advertisedListeners, protocols,
+                true, true, "3.0.0");
+    }
+
+    private void setIsolationPolicies(SimpleResourceAllocationPolicies 
policies,
+                                      String namespace,
+                                      Set<String> primary,
+                                      Set<String> secondary,
+                                      Set<String> shared,
+                                      int min_limit) {
+        reset(policies);
+        NamespaceName namespaceName = NamespaceName.get(namespace);
+        NamespaceBundle namespaceBundle = mock(NamespaceBundle.class);
+        doReturn(true).when(namespaceBundle).hasNonPersistentTopic();
+        doReturn(namespaceName).when(namespaceBundle).getNamespaceObject();
+        doReturn(false).when(policies).areIsolationPoliciesPresent(any());
+        doReturn(false).when(policies).isPrimaryBroker(any(), any());
+        doReturn(false).when(policies).isSecondaryBroker(any(), any());
+        doReturn(true).when(policies).isSharedBroker(any());
+
+        
doReturn(true).when(policies).areIsolationPoliciesPresent(eq(namespaceName));
+
+        primary.forEach(broker -> {
+            doReturn(true).when(policies).isPrimaryBroker(eq(namespaceName), 
eq(broker));
+        });
+
+        secondary.forEach(broker -> {
+            doReturn(true).when(policies).isSecondaryBroker(eq(namespaceName), 
eq(broker));
+        });
+
+        shared.forEach(broker -> {
+            doReturn(true).when(policies).isSharedBroker(eq(broker));
+        });
+
+        doAnswer(invocationOnMock -> {
+            Integer totalPrimaryCandidates = invocationOnMock.getArgument(1, 
Integer.class);
+            return totalPrimaryCandidates < min_limit;
+        }).when(policies).shouldFailoverToSecondaries(eq(namespaceName), 
anyInt());
+    }
+
+    private PulsarService getMockPulsar() {
+        var pulsar = mock(PulsarService.class);
+        var namespaceService = mock(NamespaceService.class);
+        doReturn(namespaceService).when(pulsar).getNamespaceService();
+        NamespaceBundleFactory factory = mock(NamespaceBundleFactory.class);
+        doReturn(factory).when(namespaceService).getNamespaceBundleFactory();
+        doAnswer(answer -> {
+            String namespace = answer.getArgument(0, String.class);
+            String bundleRange = answer.getArgument(1, String.class);
+            String[] boundaries = bundleRange.split("_");
+            Long lowerEndpoint = Long.decode(boundaries[0]);
+            Long upperEndpoint = Long.decode(boundaries[1]);
+            Range<Long> hashRange = Range.range(lowerEndpoint, 
BoundType.CLOSED, upperEndpoint,
+                    (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) 
? BoundType.CLOSED : BoundType.OPEN);
+            return new NamespaceBundle(NamespaceName.get(namespace), 
hashRange, factory);
+        }).when(factory).getBundle(anyString(), anyString());
+        return pulsar;
+    }
+
+
     @Test
     public void testBundlesWithAntiAffinityGroup() throws 
IllegalAccessException, MetadataStoreException {
-        var pulsar = mock(PulsarService.class);
+        var pulsar = getMockPulsar();
         TransferShedder transferShedder = new TransferShedder(pulsar);
         var allocationPoliciesSpy = (SimpleResourceAllocationPolicies)
                 spy(FieldUtils.readDeclaredField(transferShedder, 
"allocationPolicies", true));
@@ -392,11 +582,16 @@ public class TransferShedderTest {
 
         var ctx = setupContext();
         var topBundlesLoadDataStore = ctx.topBundleLoadDataStore();
-        topBundlesLoadDataStore.pushAsync("broker1", 
getTopBundlesLoad("my-tenant/my-namespaceA/0x00000000_0xFFFFFFF", 1, 3));
-        topBundlesLoadDataStore.pushAsync("broker2", 
getTopBundlesLoad("my-tenant/my-namespaceB/0x00000000_0xFFFFFFF", 2, 8));
-        topBundlesLoadDataStore.pushAsync("broker3", 
getTopBundlesLoad("my-tenant/my-namespaceC/0x00000000_0xFFFFFFF", 6, 10));
-        topBundlesLoadDataStore.pushAsync("broker4", 
getTopBundlesLoad("my-tenant/my-namespaceD/0x00000000_0xFFFFFFF", 10, 20));
-        topBundlesLoadDataStore.pushAsync("broker5", 
getTopBundlesLoad("my-tenant/my-namespaceE/0x00000000_0xFFFFFFF", 70, 20));
+        topBundlesLoadDataStore.pushAsync("broker1",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceA", 1, 
3));
+        topBundlesLoadDataStore.pushAsync("broker2",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceB", 2, 
8));
+        topBundlesLoadDataStore.pushAsync("broker3",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceC", 6, 
10));
+        topBundlesLoadDataStore.pushAsync("broker4",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceD", 10, 
20));
+        topBundlesLoadDataStore.pushAsync("broker5",
+                getTopBundlesLoadWithOutSuffix("my-tenant/my-namespaceE", 70, 
20));
         var res = transferShedder.findBundlesForUnloading(ctx, Map.of(), 
Map.of());
 
         var expected = new UnloadDecision();

Reply via email to