heesung-sn commented on code in PR #19958:
URL: https://github.com/apache/pulsar/pull/19958#discussion_r1153936140


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -682,54 +686,48 @@ private boolean isTransferable(LoadManagerContext context,
         NamespaceBundle namespaceBundle =
                 
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespace, 
bundleRange);
 
-        if (!canTransferWithIsolationPoliciesToBroker(
-                context, availableBrokers, namespaceBundle, srcBroker, 
dstBroker)) {
-            return false;
-        }
-
-        if (antiAffinityGroupPolicyHelper != null
-                && !antiAffinityGroupPolicyHelper.canUnload(availableBrokers, 
bundle, srcBroker, dstBroker)) {
+        if (!isLoadBalancerSheddingBundlesWithPoliciesEnabled(context, 
namespaceBundle)) {
             return false;
         }
-        return true;
-    }
-
-    /**
-     * Check the gave bundle and broker can be transfer or unload with 
isolation policies applied.
-     *
-     * @param context The load manager context.
-     * @param availableBrokers The available brokers.
-     * @param namespaceBundle The bundle try to unload or transfer.
-     * @param currentBroker The current broker.
-     * @param targetBroker The broker will be transfer to.
-     * @return Can be transfer/unload or not.
-     */
-    private boolean 
canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
-                                                             Map<String, 
BrokerLookupData> availableBrokers,
-                                                             NamespaceBundle 
namespaceBundle,
-                                                             String 
currentBroker,
-                                                             Optional<String> 
targetBroker) {
-        if (isolationPoliciesHelper == null
-                || 
!allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject()))
 {
-            return true;
-        }
 
-        // bundle has isolation policies.
-        if 
(!context.brokerConfiguration().isLoadBalancerSheddingBundlesWithPoliciesEnabled())
 {
-            return false;
+        Map<String, BrokerLookupData> candidates = new 
HashMap<>(availableBrokers);
+        brokerFilterPipeline.forEach(filter -> {
+            try {
+                filter.filter(candidates, namespaceBundle, context);
+            } catch (BrokerFilterException e) {
+                log.error("Failed to filter brokers with filter: {}", 
filter.getClass().getName(), e);

Review Comment:
   let's return false if fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to