This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ec6be96247d [fix][broker] fix ExtensibleLoadManager to override the
ownerships concurrently without blocking load manager thread (#24156)
ec6be96247d is described below
commit ec6be96247d1d065f9c8e5db6fdb4bc10e200ddf
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Apr 9 07:04:15 2025 -0700
[fix][broker] fix ExtensibleLoadManager to override the ownerships
concurrently without blocking load manager thread (#24156)
(cherry picked from commit 066a20c33fe28ed0bb5ec9b3846ed67560877302)
---
conf/broker.conf | 7 ++
.../apache/pulsar/broker/ServiceConfiguration.java | 10 ++
.../channel/ServiceUnitStateChannelImpl.java | 133 +++++++++++++--------
.../extensions/scheduler/TransferShedder.java | 6 +-
4 files changed, 103 insertions(+), 53 deletions(-)
diff --git a/conf/broker.conf b/conf/broker.conf
index 08cd7199275..00121ce9812 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1539,6 +1539,13 @@
loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600
+
+# Specify the maximum number of concurrent orphan bundle ownership overrides.
+# The leader broker triggers these overrides upon detecting orphaned bundles.
+# It identifies orphan bundle ownerships by periodically scanning ownership
data
+# and monitoring for broker shutdowns or inactive states.
+loadBalancerServiceUnitStateMaxConcurrentOverrides = 64
+
### --- Replication --- ###
# Enable replication metrics
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 955baaf73b9..f306ce5c906 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2833,6 +2833,16 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean loadBalancerMultiPhaseBundleUnload = true;
+ @FieldContext(
+ dynamic = true,
+ category = CATEGORY_LOAD_BALANCER,
+ doc = "Specify the maximum number of concurrent orphan bundle
ownership overrides. "
+ + "The leader broker triggers these overrides upon
detecting orphaned bundles. "
+ + "It identifies orphan bundle ownerships by periodically
scanning ownership data "
+ + "and monitoring for broker shutdowns or inactive states."
+ )
+ private int loadBalancerServiceUnitStateMaxConcurrentOverrides = 64;
+
/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
index 443ff6272a2..cdd016ec245 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
@@ -126,6 +126,7 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10
* 60 * 1000;
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
+
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
@@ -1377,51 +1378,53 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
broker, delayInSecs, cleanupJobs.size());
}
- private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, String inactiveBroker) {
+ private CompletableFuture<Void> overrideOwnership(String serviceUnit,
+ ServiceUnitStateData
orphanData,
+ String inactiveBroker,
+ boolean gracefully) {
+
final var version = getNextVersionId(orphanData);
- try {
- selectBroker(serviceUnit, inactiveBroker)
- .thenApply(selectedOpt ->
- selectedOpt.map(selectedBroker -> {
- if (orphanData.state() == Splitting) {
- // if Splitting, set orphan.dstBroker() as
dst to indicate where it was from.
- // (The src broker runs handleSplitEvent.)
- return new ServiceUnitStateData(Splitting,
orphanData.dstBroker(), selectedBroker,
-
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
- } else if (orphanData.state() == Owned) {
- // if Owned, set orphan.dstBroker() as
source to clean it up in case it is still
- // alive.
- return new ServiceUnitStateData(Owned,
selectedBroker,
-
selectedBroker.equals(orphanData.dstBroker()) ? null :
- orphanData.dstBroker(),
- true, version);
- } else {
- // if Assigning or Releasing, set
orphan.sourceBroker() as source
- // to clean it up in case it is still
alive.
- return new ServiceUnitStateData(Owned,
selectedBroker,
-
selectedBroker.equals(orphanData.sourceBroker()) ? null :
- orphanData.sourceBroker(),
- true, version);
- }
- // If no broker is selected(available), free
the ownership.
- // If the previous owner is still active, it
will close the bundle(topic) ownership.
- }).orElseGet(() -> new ServiceUnitStateData(Free,
null,
- orphanData.state() == Owned ?
orphanData.dstBroker() : orphanData.sourceBroker(),
- true,
- version)))
- .thenCompose(override -> {
- log.info(
- "Overriding inactiveBroker:{}, ownership
serviceUnit:{} from orphanData:{} to "
- + "overrideData:{}",
- inactiveBroker, serviceUnit, orphanData,
override);
- return publishOverrideEventAsync(serviceUnit,
override);
- }).get(config.getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
- } catch (Throwable e) {
- log.error(
- "Failed to override inactiveBroker:{} ownership
serviceUnit:{} orphanData:{}. "
- + "totalCleanupErrorCnt:{}",
- inactiveBroker, serviceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet(), e);
- }
+ return selectBroker(serviceUnit, inactiveBroker)
+ .thenApply(selectedOpt ->
+ selectedOpt.map(selectedBroker -> {
+ if (orphanData.state() == Splitting) {
+ // if Splitting, set orphan.dstBroker() as dst
to indicate where it was from.
+ // (The src broker runs handleSplitEvent.)
+ return new ServiceUnitStateData(Splitting,
orphanData.dstBroker(), selectedBroker,
+
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
+ } else if (orphanData.state() == Owned) {
+ // if Owned, set orphan.dstBroker() as source
to clean it up in case it is still
+ // alive.
+ var sourceBroker =
selectedBroker.equals(orphanData.dstBroker()) ? null :
+ orphanData.dstBroker();
+ // if gracefully, try to release ownership
first
+ var overrideState = gracefully && sourceBroker
!= null ? Releasing : Owned;
+ return new ServiceUnitStateData(
+ overrideState,
+ selectedBroker,
+ sourceBroker,
+ true, version);
+ } else {
+ // if Assigning or Releasing, set
orphan.sourceBroker() as source
+ // to clean it up in case it is still alive.
+ return new ServiceUnitStateData(Owned,
selectedBroker,
+
selectedBroker.equals(orphanData.sourceBroker()) ? null :
+ orphanData.sourceBroker(),
+ true, version);
+ }
+ // If no broker is selected(available), free the
ownership.
+ // If the previous owner is still active, it will
close the bundle(topic) ownership.
+ }).orElseGet(() -> new ServiceUnitStateData(Free, null,
+ orphanData.state() == Owned ?
orphanData.dstBroker() : orphanData.sourceBroker(),
+ true,
+ version)))
+ .thenCompose(override -> {
+ log.info(
+ "Overriding inactiveBroker:{}, ownership
serviceUnit:{} from orphanData:{} to "
+ + "overrideData:{}",
+ inactiveBroker, serviceUnit, orphanData, override);
+ return publishOverrideEventAsync(serviceUnit, override);
+ });
}
private void waitForCleanups(String broker, boolean excludeSystemTopics,
int maxWaitTimeInMillis) {
@@ -1491,7 +1494,21 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
}
- private synchronized void doCleanup(String broker, boolean gracefully) {
+ private void tryWaitForOverrides(List<CompletableFuture<Void>>
overrideFutures, boolean force) {
+ if (overrideFutures.size() >=
config.getLoadBalancerServiceUnitStateMaxConcurrentOverrides() || force) {
+ try {
+ FutureUtil.waitForAll(overrideFutures)
+ .get(config.getMetadataStoreOperationTimeoutSeconds(),
SECONDS);
+ } catch (Throwable e) {
+ log.error("Failed to override ownership:
totalCleanupErrorCnt:{}",
+ totalCleanupErrorCnt.incrementAndGet(), e);
+ } finally {
+ overrideFutures.clear();
+ }
+ }
+ }
+
+ private void doCleanup(String broker, boolean gracefully) {
try {
if
(getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS,
TimeUnit.SECONDS)
.isEmpty()) {
@@ -1530,8 +1547,11 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
+ List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
Map<String, ServiceUnitStateData> orphanSystemServiceUnits = new
HashMap<>();
- for (var etr : tableview.entrySet()) {
+ var iter = tableview.entrySet().iterator();
+ while (iter.hasNext()) {
+ var etr = iter.next();
var stateData = etr.getValue();
var serviceUnit = etr.getKey();
var state = state(stateData);
@@ -1540,7 +1560,8 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
if (serviceUnit.startsWith(SYSTEM_NAMESPACE.toString())) {
orphanSystemServiceUnits.put(serviceUnit, stateData);
} else {
- overrideOwnership(serviceUnit, stateData, broker);
+ overrideFutures.add(overrideOwnership(serviceUnit,
stateData, broker, gracefully));
+ tryWaitForOverrides(overrideFutures, !iter.hasNext());
}
orphanServiceUnitCleanupCnt++;
}
@@ -1565,9 +1586,15 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
}
// clean system bundles in the end
- for (var orphanSystemServiceUnit :
orphanSystemServiceUnits.entrySet()) {
+ var orphanSystemServiceUnitIter =
orphanSystemServiceUnits.entrySet().iterator();
+ while (orphanSystemServiceUnitIter.hasNext()) {
+ var orphanSystemServiceUnit = iter.next();
log.info("Overriding orphan system service unit:{}",
orphanSystemServiceUnit.getKey());
- overrideOwnership(orphanSystemServiceUnit.getKey(),
orphanSystemServiceUnit.getValue(), broker);
+
+ overrideFutures.add(
+ overrideOwnership(orphanSystemServiceUnit.getKey(),
orphanSystemServiceUnit.getValue(), broker,
+ gracefully));
+ tryWaitForOverrides(overrideFutures,
!orphanSystemServiceUnitIter.hasNext());
}
try {
@@ -1691,10 +1718,14 @@ public class ServiceUnitStateChannelImpl implements
ServiceUnitStateChannel {
// timedOutInFlightStateServiceUnits are the in-flight ones although
their src and dst brokers are known to
// be active.
if (!timedOutInFlightStateServiceUnits.isEmpty()) {
- for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
+ List<CompletableFuture<Void>> overrideFutures = new ArrayList<>();
+ var iter = timedOutInFlightStateServiceUnits.entrySet().iterator();
+ while (iter.hasNext()) {
+ var etr = iter.next();
var orphanServiceUnit = etr.getKey();
var orphanData = etr.getValue();
- overrideOwnership(orphanServiceUnit, orphanData, null);
+ overrideFutures.add(overrideOwnership(orphanServiceUnit,
orphanData, null, false));
+ tryWaitForOverrides(overrideFutures, !iter.hasNext());
orphanServiceUnitCleanupCnt++;
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
index 72d671aa4ca..b5255f2713a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedder.java
@@ -463,8 +463,10 @@ public class TransferShedder implements
NamespaceUnloadStrategy {
Optional<TopBundlesLoadData> bundlesLoadData =
context.topBundleLoadDataStore().get(maxBroker);
if (bundlesLoadData.isEmpty() ||
bundlesLoadData.get().getTopBundlesLoadData().isEmpty()) {
- log.error(String.format(CANNOT_UNLOAD_BROKER_MSG
- + " TopBundlesLoadData is empty.", maxBroker));
+ if (debugMode) {
+ log.info(String.format(CANNOT_UNLOAD_BROKER_MSG
+ + " TopBundlesLoadData is empty.", maxBroker));
+ }
numOfBrokersWithEmptyLoadData++;
continue;
}