This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a362eaa8bd8 [improve][broker] Improve knownBrokers update in
ModularLoadManagerImpl (#20196)
a362eaa8bd8 is described below
commit a362eaa8bd8caf13795aba5ddbdf4edd8cec7bd0
Author: houxiaoyu <[email protected]>
AuthorDate: Sat Apr 29 01:57:43 2023 +0800
[improve][broker] Improve knownBrokers update in ModularLoadManagerImpl
(#20196)
---
.../broker/loadbalance/impl/ModularLoadManagerImpl.java | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 30a2ef5cdf2..73b4f318f3a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -195,7 +195,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
private long unloadBundleCount = 0;
private final Lock lock = new ReentrantLock();
- private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
+ private final Set<String> knownBrokers = new HashSet<>();
private Map<String, String> bundleBrokerAffinityMap;
/**
@@ -480,13 +480,11 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
checkNamespaceBundleSplit();
}
- private void cleanupDeadBrokersData() {
+ private synchronized void cleanupDeadBrokersData() {
final Set<String> activeBrokers = getAvailableBrokers();
- final Set<String> knownBrokersCopy = new HashSet<>(this.knownBrokers);
- Collection<String> newBrokers =
CollectionUtils.subtract(activeBrokers, knownBrokersCopy);
- this.knownBrokers.addAll(newBrokers);
- Collection<String> deadBrokers =
CollectionUtils.subtract(knownBrokersCopy, activeBrokers);
- this.knownBrokers.removeAll(deadBrokers);
+ Collection<String> deadBrokers =
CollectionUtils.subtract(knownBrokers, activeBrokers);
+ this.knownBrokers.clear();
+ this.knownBrokers.addAll(activeBrokers);
if (pulsar.getLeaderElectionService() != null
&& pulsar.getLeaderElectionService().isLeader()) {
deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);