codelipenghui commented on code in PR #20822:
URL: https://github.com/apache/pulsar/pull/20822#discussion_r1267677128
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
Review Comment:
```suggestion
if (destBroker.isEmpty()) {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("Bundle {} destination broker {} is the same
as the source broker {}",
Review Comment:
```suggestion
log.warn("The destination broker {} is the same as
the current owner broker for Bundle {}",
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("Bundle {} destination broker {} is the same
as the source broker {}",
+ bundle, destBroker.get(), broker);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
Review Comment:
Is it better to print the strategy class name for the above two logs?
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("Bundle {} destination broker {} is the same
as the source broker {}",
+ bundle, destBroker.get(), broker);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
try {
-
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
+ pulsar.getAdminClient().namespaces()
+ .unloadNamespaceBundle(namespaceName,
bundleRange, destBroker.get());
Review Comment:
Or maybe we don't need to ensure the bundle will actually go to the dest
broker? We just want to avoid the infinite loop as possible
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
Review Comment:
```suggestion
log.info("No broker available to unload bundle {}
from broker {}", bundle, broker);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("Bundle {} destination broker {} is the same
as the source broker {}",
+ bundle, destBroker.get(), broker);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
try {
-
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
+ pulsar.getAdminClient().namespaces()
+ .unloadNamespaceBundle(namespaceName,
bundleRange, destBroker.get());
loadData.getRecentlyUnloadedBundles().put(bundle,
System.currentTimeMillis());
+ this.preallocateBundle(bundle, destBroker.get());
Review Comment:
Maybe we can skip it for the unloading stage? It should be fine if the load
manager gets the wrong owner at the unloading stage for a short period of time.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java:
##########
@@ -659,12 +660,25 @@ public synchronized void doLoadShedding() {
if (!shouldAntiAffinityNamespaceUnload(namespaceName,
bundleRange, broker)) {
return;
}
+ NamespaceBundle bundleToUnload =
LoadManagerShared.getNamespaceBundle(pulsar, bundle);
+ Optional<String> destBroker =
this.selectBroker(bundleToUnload);
+ if (!destBroker.isPresent()) {
+ log.warn("No broker available to unload bundle {} from
broker {}", bundle, broker);
+ return;
+ }
+ if (destBroker.get().equals(broker)) {
+ log.warn("Bundle {} destination broker {} is the same
as the source broker {}",
+ bundle, destBroker.get(), broker);
+ return;
+ }
- log.info("[{}] Unloading bundle: {} from broker {}",
- strategy.getClass().getSimpleName(), bundle,
broker);
+ log.info("[{}] Unloading bundle: {} from broker {} to dest
broker {}",
+ strategy.getClass().getSimpleName(), bundle,
broker, destBroker.get());
try {
-
pulsar.getAdminClient().namespaces().unloadNamespaceBundle(namespaceName,
bundleRange);
+ pulsar.getAdminClient().namespaces()
+ .unloadNamespaceBundle(namespaceName,
bundleRange, destBroker.get());
Review Comment:
Just a reminder, The `destBroker` is only available after 2.11.0. We are not
able to cherry-pick to branch-2.10
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]