heesung-sn commented on code in PR #19592:
URL: https://github.com/apache/pulsar/pull/19592#discussion_r1119197844


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -334,16 +344,16 @@ public UnloadDecision 
findBundlesForUnloading(LoadManagerContext context,
                     int remainingTopBundles = topBundlesLoadData.size();
                     for (var e : topBundlesLoadData) {
                         String bundle = e.bundleName();
-                        if (!recentlyUnloadedBundles.containsKey(bundle) && 
isTransferable(bundle)) {
+                        if (!recentlyUnloadedBundles.containsKey(bundle)
+                                && isTransferable(context, bundle, maxBroker, 
Optional.ofNullable(minBroker))) {
                             var bundleData = e.stats();
                             double throughput = bundleData.msgThroughputIn + 
bundleData.msgThroughputOut;
                             if (remainingTopBundles > 1
                                     && (trafficMarkedToOffload < 
offloadThroughput
                                     || !atLeastOneBundleSelected)) {
                                 if (transfer) {
                                     selectedBundlesCache.put(maxBroker,
-                                            new Unload(maxBroker, bundle,
-                                                    Optional.of(minBroker)));
+                                            new Unload(maxBroker, bundle, 
Optional.ofNullable(minBroker)));

Review Comment:
   In what cases, do we expect a null minBroker? If minBroker is null, then we 
should skip this Unload.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java:
##########
@@ -434,4 +452,45 @@ private boolean isTransferable(String bundle) {
         }
         return true;
     }
+
+    /**
+     * Check the gave bundle and broker can be transfer or unload with 
isolation policies applied.
+     *
+     * @param context The load manager context.
+     * @param namespaceBundle The bundle try to unload or transfer.
+     * @param maxBroker The current broker.
+     * @param minBroker The broker will be transfer to.
+     * @return Can be transfer/unload or not.
+     */
+    private boolean 
canTransferWithIsolationPoliciesToBroker(LoadManagerContext context,
+                                                             NamespaceBundle 
namespaceBundle,
+                                                             String maxBroker,
+                                                             Optional<String> 
minBroker) {
+        if (isolationPoliciesHelper == null ||
+                
!allocationPolicies.areIsolationPoliciesPresent(namespaceBundle.getNamespaceObject()))
 {
+            return true;
+        }
+        int timeout = 
context.brokerConfiguration().getMetadataStoreOperationTimeoutSeconds();
+        boolean transfer = 
context.brokerConfiguration().isLoadBalancerTransferEnabled();
+        Map<String, BrokerLookupData> availableBrokers;
+        try {
+            availableBrokers =

Review Comment:
   Can we pass this `availableBrokers` as a func arg? It seems redundant to 
call this for every candidate transfer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to