This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ec6be96247d [fix][broker] fix ExtensibleLoadManager to override the 
ownerships concurrently without blocking load manager thread (#24156)
ec6be96247d is described below

commit ec6be96247d1d065f9c8e5db6fdb4bc10e200ddf
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Apr 9 07:04:15 2025 -0700

    [fix][broker] fix ExtensibleLoadManager to override the ownerships 
concurrently without blocking load manager thread (#24156)
    
    (cherry picked from commit 066a20c33fe28ed0bb5ec9b3846ed67560877302)
---
 conf/broker.conf                                   |   7 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  10 ++
 .../channel/ServiceUnitStateChannelImpl.java       | 133 +++++++++++++--------
 .../extensions/scheduler/TransferShedder.java      |   6 +-
 4 files changed, 103 insertions(+), 53 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 08cd7199275..00121ce9812 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1539,6 +1539,13 @@ 
loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
 loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
 
 
+
+# Specify the maximum number of concurrent orphan bundle ownership overrides.
+# The leader broker triggers these overrides upon detecting orphaned bundles.
+# It identifies orphan bundle ownerships by periodically scanning ownership 
data
+# and monitoring for broker shutdowns or inactive states.
+loadBalancerServiceUnitStateMaxConcurrentOverrides = 64
+
 ### --- Replication --- ###
 
 # Enable replication metrics
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 955baaf73b9..f306ce5c906 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2833,6 +2833,16 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean loadBalancerMultiPhaseBundleUnload = true;
 
+    @FieldContext(
+            dynamic = true,
+            category = CATEGORY_LOAD_BALANCER,
+            doc = "Specify the maximum number of concurrent orphan bundle 
ownership overrides. "
+                    + "The leader broker triggers these overrides upon 
detecting orphaned bundles. "
+                    + "It identifies orphan bundle ownerships by periodically 
scanning ownership data "
+                    + "and monitoring for broker shutdowns or inactive states."
+    )
+    private int loadBalancerServiceUnitStateMaxConcurrentOverrides = 64;
+
     /**** --- Replication. --- ****/
     @FieldContext(
         category = CATEGORY_REPLICATION,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 443ff6272a2..cdd016ec245 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -126,6 +126,7 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
     private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 
* 60 * 1000;
     private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
     private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
+
     private final PulsarService pulsar;
     private final ServiceConfiguration config;
     private final Schema<ServiceUnitStateData> schema;
@@ -1377,51 +1378,53 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 broker, delayInSecs, cleanupJobs.size());
     }
 
-    private void overrideOwnership(String serviceUnit, ServiceUnitStateData 
orphanData, String inactiveBroker) {
+    private CompletableFuture<Void> overrideOwnership(String serviceUnit,
+                                                      ServiceUnitStateData 
orphanData,
+                                                      String inactiveBroker,
+                                                      boolean gracefully) {
+
         final var version = getNextVersionId(orphanData);
-        try {
-            selectBroker(serviceUnit, inactiveBroker)
-                    .thenApply(selectedOpt ->
-                            selectedOpt.map(selectedBroker -> {
-                                if (orphanData.state() == Splitting) {
-                                    // if Splitting, set orphan.dstBroker() as 
dst to indicate where it was from.
-                                    // (The src broker runs handleSplitEvent.)
-                                    return new ServiceUnitStateData(Splitting, 
orphanData.dstBroker(), selectedBroker,
-                                            
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
-                                } else if (orphanData.state() == Owned) {
-                                    // if Owned, set orphan.dstBroker() as 
source to clean it up in case it is still
-                                    // alive.
-                                    return new ServiceUnitStateData(Owned, 
selectedBroker,
-                                            
selectedBroker.equals(orphanData.dstBroker()) ? null :
-                                                    orphanData.dstBroker(),
-                                            true, version);
-                                } else {
-                                    // if Assigning or Releasing, set 
orphan.sourceBroker() as source
-                                    // to clean it up in case it is still 
alive.
-                                    return new ServiceUnitStateData(Owned, 
selectedBroker,
-                                            
selectedBroker.equals(orphanData.sourceBroker()) ? null :
-                                                    orphanData.sourceBroker(),
-                                            true, version);
-                                }
-                                // If no broker is selected(available), free 
the ownership.
-                                // If the previous owner is still active, it 
will close the bundle(topic) ownership.
-                            }).orElseGet(() -> new ServiceUnitStateData(Free, 
null,
-                                    orphanData.state() == Owned ? 
orphanData.dstBroker() : orphanData.sourceBroker(),
-                                    true,
-                                    version)))
-                    .thenCompose(override -> {
-                        log.info(
-                                "Overriding inactiveBroker:{}, ownership 
serviceUnit:{} from orphanData:{} to "
-                                        + "overrideData:{}",
-                                inactiveBroker, serviceUnit, orphanData, 
override);
-                        return publishOverrideEventAsync(serviceUnit, 
override);
-                    }).get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
-        } catch (Throwable e) {
-            log.error(
-                    "Failed to override inactiveBroker:{} ownership 
serviceUnit:{} orphanData:{}. "
-                            + "totalCleanupErrorCnt:{}",
-                    inactiveBroker, serviceUnit, orphanData, 
totalCleanupErrorCnt.incrementAndGet(), e);
-        }
+        return selectBroker(serviceUnit, inactiveBroker)
+                .thenApply(selectedOpt ->
+                        selectedOpt.map(selectedBroker -> {
+                            if (orphanData.state() == Splitting) {
+                                // if Splitting, set orphan.dstBroker() as dst 
to indicate where it was from.
+                                // (The src broker runs handleSplitEvent.)
+                                return new ServiceUnitStateData(Splitting, 
orphanData.dstBroker(), selectedBroker,
+                                        
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
+                            } else if (orphanData.state() == Owned) {
+                                // if Owned, set orphan.dstBroker() as source 
to clean it up in case it is still
+                                // alive.
+                                var sourceBroker = 
selectedBroker.equals(orphanData.dstBroker()) ? null :
+                                        orphanData.dstBroker();
+                                // if gracefully, try to release ownership 
first
+                                var overrideState = gracefully && sourceBroker 
!= null ? Releasing : Owned;
+                                return new ServiceUnitStateData(
+                                        overrideState,
+                                        selectedBroker,
+                                        sourceBroker,
+                                        true, version);
+                            } else {
+                                // if Assigning or Releasing, set 
orphan.sourceBroker() as source
+                                // to clean it up in case it is still alive.
+                                return new ServiceUnitStateData(Owned, 
selectedBroker,
+                                        
selectedBroker.equals(orphanData.sourceBroker()) ? null :
+                                                orphanData.sourceBroker(),
+                                        true, version);
+                            }
+                            // If no broker is selected(available), free the 
ownership.
+                            // If the previous owner is still active, it will 
close the bundle(topic) ownership.
+                        }).orElseGet(() -> new ServiceUnitStateData(Free, null,
+                                orphanData.state() == Owned ? 
orphanData.dstBroker() : orphanData.sourceBroker(),
+                                true,
+                                version)))
+                .thenCompose(override -> {
+                    log.info(
+                            "Overriding inactiveBroker:{}, ownership 
serviceUnit:{} from orphanData:{} to "
+                                    + "overrideData:{}",
+                            inactiveBroker, serviceUnit, orphanData, override);
+                    return publishOverrideEventAsync(serviceUnit, override);
+                });
     }
 
     private void waitForCleanups(String broker, boolean excludeSystemTopics, 
int maxWaitTimeInMillis) {
@@ -1491,7 +1494,21 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
     }
 
-    private synchronized void doCleanup(String broker, boolean gracefully) {
+    private void tryWaitForOverrides(List<CompletableFuture<Void>> 
overrideFutures, boolean force) {
+        if (overrideFutures.size() >= 
config.getLoadBalancerServiceUnitStateMaxConcurrentOverrides() || force) {
+            try {
+                FutureUtil.waitForAll(overrideFutures)
+                        .get(config.getMetadataStoreOperationTimeoutSeconds(), 
SECONDS);
+            } catch (Throwable e) {
+                log.error("Failed to override ownership: 
totalCleanupErrorCnt:{}",
+                        totalCleanupErrorCnt.incrementAndGet(), e);
+            } finally {
+                overrideFutures.clear();
+            }
+        }
+    }
+
+    private void doCleanup(String broker, boolean gracefully) {
         try {
             if 
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, 
TimeUnit.SECONDS)
                     .isEmpty()) {
@@ -1530,8 +1547,11 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         log.info("Started ownership cleanup for the inactive broker:{}", 
broker);
         int orphanServiceUnitCleanupCnt = 0;
         long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
+        List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
         Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new 
HashMap<>();
-        for (var etr : tableview.entrySet()) {
+        var iter =  tableview.entrySet().iterator();
+        while (iter.hasNext()) {
+            var etr = iter.next();
             var stateData = etr.getValue();
             var serviceUnit = etr.getKey();
             var state = state(stateData);
@@ -1540,7 +1560,8 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
                 if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
                     orphanSystemServiceUnits.put(serviceUnit, stateData);
                 } else {
-                    overrideOwnership(serviceUnit, stateData, broker);
+                    overrideFutures.add(overrideOwnership(serviceUnit, 
stateData, broker, gracefully));
+                    tryWaitForOverrides(overrideFutures, !iter.hasNext());
                 }
                 orphanServiceUnitCleanupCnt++;
             }
@@ -1565,9 +1586,15 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         }
 
         // clean system bundles in the end
-        for (var orphanSystemServiceUnit : 
orphanSystemServiceUnits.entrySet()) {
+        var orphanSystemServiceUnitIter = 
orphanSystemServiceUnits.entrySet().iterator();
+        while (orphanSystemServiceUnitIter.hasNext()) {
+            var orphanSystemServiceUnit = iter.next();
             log.info("Overriding orphan system service unit:{}", 
orphanSystemServiceUnit.getKey());
-            overrideOwnership(orphanSystemServiceUnit.getKey(), 
orphanSystemServiceUnit.getValue(), broker);
+
+            overrideFutures.add(
+                    overrideOwnership(orphanSystemServiceUnit.getKey(), 
orphanSystemServiceUnit.getValue(), broker,
+                            gracefully));
+            tryWaitForOverrides(overrideFutures, 
!orphanSystemServiceUnitIter.hasNext());
         }
 
         try {
@@ -1691,10 +1718,14 @@ public class ServiceUnitStateChannelImpl implements 
ServiceUnitStateChannel {
         // timedOutInFlightStateServiceUnits are the in-flight ones although 
their src and dst brokers are known to
         // be active.
         if (!timedOutInFlightStateServiceUnits.isEmpty()) {
-            for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
+            List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
+            var iter = timedOutInFlightStateServiceUnits.entrySet().iterator();
+            while (iter.hasNext()) {
+                var etr = iter.next();
                 var orphanServiceUnit = etr.getKey();
                 var orphanData = etr.getValue();
-                overrideOwnership(orphanServiceUnit, orphanData, null);
+                overrideFutures.add(overrideOwnership(orphanServiceUnit, 
orphanData, null, false));
+                tryWaitForOverrides(overrideFutures, !iter.hasNext());
                 orphanServiceUnitCleanupCnt++;
             }
         }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 72d671aa4ca..b5255f2713a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -463,8 +463,10 @@ public class TransferShedder implements 
NamespaceUnloadStrategy {
 
                 Optional<TopBundlesLoadData> bundlesLoadData = 
context.topBundleLoadDataStore().get(maxBroker);
                 if (bundlesLoadData.isEmpty() || 
bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
-                    log.error(String.format(CANNOT_UNLOAD_BROKER_MSG
-                            + " TopBundlesLoadData is empty.", maxBroker));
+                    if (debugMode) {
+                        log.info(String.format(CANNOT_UNLOAD_BROKER_MSG
+                                + " TopBundlesLoadData is empty.", maxBroker));
+                    }
                     numOfBrokersWithEmptyLoadData++;
                     continue;
                 }

Reply via email to