This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fa8ec29 [Broker] Remove dead brokers timeaverage data (#14411)
fa8ec29 is described below
commit fa8ec294b7bc1ef85343db457f16b435960b8103
Author: gaozhangmin <[email protected]>
AuthorDate: Sat Mar 12 10:49:48 2022 +0800
[Broker] Remove dead brokers timeaverage data (#14411)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 26 +++++++++++++++++++
.../loadbalance/ModularLoadManagerImplTest.java | 30 ++++++++++++++++++++++
2 files changed, 56 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 326f6af..9deb368 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
@@ -42,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
@@ -198,6 +200,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
private long unloadBundleCount = 0;
private final Lock lock = new ReentrantLock();
+ private Set<String> knownBrokers = new HashSet<>();
/**
* Initializes fields which do not depend on PulsarService.
initialize(PulsarService) should subsequently be called.
@@ -472,12 +475,25 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
if (log.isDebugEnabled()) {
log.debug("Updating broker and bundle data for loadreport");
}
+ cleanupDeadBrokersData();
updateAllBrokerData();
updateBundleData();
// broker has latest load-report: check if any bundle requires split
checkNamespaceBundleSplit();
}
+ private void cleanupDeadBrokersData() {
+ final Set<String> activeBrokers = getAvailableBrokers();
+ Collection<String> newBrokers =
CollectionUtils.subtract(activeBrokers, knownBrokers);
+ knownBrokers.addAll(newBrokers);
+ Collection<String> deadBrokers =
CollectionUtils.subtract(knownBrokers, activeBrokers);
+ knownBrokers.removeAll(deadBrokers);
+ if (pulsar.getLeaderElectionService() != null
+ && pulsar.getLeaderElectionService().isLeader()) {
+
deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);
+ }
+ }
+
// As the leader broker, update the broker data map in loadData by
querying metadata store for the broker data put
// there by each broker via updateLocalBrokerData.
private void updateAllBrokerData() {
@@ -1079,6 +1095,16 @@ public class ModularLoadManagerImpl implements
ModularLoadManager {
}
}
+ private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
+ final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" +
broker;
+ timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__,
ex) -> {
+ if (ex != null && !(ex.getCause() instanceof
MetadataStoreException.NotFoundException)) {
+ log.warn("Failed to delete dead broker {} time "
+ + "average data from metadata store", broker, ex);
+ }
+ });
+ }
+
private void refreshBrokerToFailureDomainMap() {
if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
return;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 04dbd13..22629cc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.loadbalance;
+import static
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.EnumSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -51,6 +53,7 @@ import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageBrokerData;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
@@ -602,4 +605,31 @@ public class ModularLoadManagerImplTest {
pulsar.close();
}
+
+ @Test
+ public void testRemoveDeadBrokerTimeAverageData() throws Exception {
+ ModularLoadManagerWrapper loadManagerWrapper =
(ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
+ ModularLoadManagerImpl lm =
Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+ assertEquals(lm.getAvailableBrokers().size(), 2);
+
+ pulsar2.close();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(lm.getAvailableBrokers().size(), 1);
+ });
+ lm.updateAll();
+
+ List<String> data = pulsar1.getLocalMetadataStore()
+
.getMetadataCache(TimeAverageBrokerData.class).getChildren(TIME_AVERAGE_BROKER_ZPATH).join();
+
+ Awaitility.await().untilAsserted(() -> {
+ assertTrue(pulsar1.getLeaderElectionService().isLeader());
+ });
+
+ assertEquals(data.size(), 1);
+
+
+
+
+ }
}