codelipenghui commented on code in PR #20822:
URL: https://github.com/apache/pulsar/pull/20822#discussion_r1267677128


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {

Review Comment:
   ```suggestion
                       if (destBroker.isEmpty()) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("Bundle {} destination broker {} is the same 
as the source broker {}",

Review Comment:
   ```suggestion
                           log.warn("The destination broker {} is the same as 
the current owner broker for Bundle {}",
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("Bundle {} destination broker {} is the same 
as the source broker {}",
+                                bundle, destBroker.get(), broker);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());

Review Comment:
   Is it better to print the strategy class name for the above two logs?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("Bundle {} destination broker {} is the same 
as the source broker {}",
+                                bundle, destBroker.get(), broker);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
                     try {
-                        
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
+                        pulsar.getAdminClient().namespaces()
+                                .unloadNamespaceBundle(namespaceName, 
bundleRange, destBroker.get());

Review Comment:
   Or maybe we don't need to ensure the bundle will actually go to the dest 
broker? We just want to avoid the infinite loop as possible



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);

Review Comment:
   ```suggestion
                           log.info("No broker available to unload bundle {} 
from broker {}", bundle, broker);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("Bundle {} destination broker {} is the same 
as the source broker {}",
+                                bundle, destBroker.get(), broker);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
                     try {
-                        
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
+                        pulsar.getAdminClient().namespaces()
+                                .unloadNamespaceBundle(namespaceName, 
bundleRange, destBroker.get());
                         loadData.getRecentlyUnloadedBundles().put(bundle, 
System.currentTimeMillis());
+                        this.preallocateBundle(bundle, destBroker.get());

Review Comment:
   Maybe we can skip it for the unloading stage? It should be fine if the load 
manager gets the wrong owner at the unloading stage for a short period of time.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
                     if (!shouldAntiAffinityNamespaceUnload(namespaceName, 
bundleRange, broker)) {
                         return;
                     }
+                    NamespaceBundle bundleToUnload = 
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+                    Optional<String> destBroker = 
this.selectBroker(bundleToUnload);
+                    if (!destBroker.isPresent()) {
+                        log.warn("No broker available to unload bundle {} from 
broker {}", bundle, broker);
+                        return;
+                    }
+                    if (destBroker.get().equals(broker)) {
+                        log.warn("Bundle {} destination broker {} is the same 
as the source broker {}",
+                                bundle, destBroker.get(), broker);
+                        return;
+                    }
 
-                    log.info("[{}] Unloading bundle: {} from broker {}",
-                            strategy.getClass().getSimpleName(), bundle, 
broker);
+                    log.info("[{}] Unloading bundle: {} from broker {} to dest 
broker {}",
+                            strategy.getClass().getSimpleName(), bundle, 
broker, destBroker.get());
                     try {
-                        
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName, 
bundleRange);
+                        pulsar.getAdminClient().namespaces()
+                                .unloadNamespaceBundle(namespaceName, 
bundleRange, destBroker.get());

Review Comment:
   Just a reminder, The `destBroker` is only available after 2.11.0. We are not 
able to cherry-pick to branch-2.10



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to