This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8ec29  [Broker] Remove dead brokers timeaverage data (#14411)
fa8ec29 is described below

commit fa8ec294b7bc1ef85343db457f16b435960b8103
Author: gaozhangmin <[email protected]>
AuthorDate: Sat Mar 12 10:49:48 2022 +0800

    [Broker] Remove dead brokers timeaverage data (#14411)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 26 +++++++++++++++++++
 .../loadbalance/ModularLoadManagerImplTest.java    | 30 ++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 326f6af..9deb368 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.BrokerData;
@@ -198,6 +200,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
     private long unloadBundleCount = 0;
 
     private final Lock lock = new ReentrantLock();
+    private Set<String> knownBrokers = new HashSet<>();
 
     /**
      * Initializes fields which do not depend on PulsarService. 
initialize(PulsarService) should subsequently be called.
@@ -472,12 +475,25 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         if (log.isDebugEnabled()) {
             log.debug("Updating broker and bundle data for loadreport");
         }
+        cleanupDeadBrokersData();
         updateAllBrokerData();
         updateBundleData();
         // broker has latest load-report: check if any bundle requires split
         checkNamespaceBundleSplit();
     }
 
+    private void cleanupDeadBrokersData() {
+        final Set<String> activeBrokers = getAvailableBrokers();
+        Collection<String> newBrokers = 
CollectionUtils.subtract(activeBrokers, knownBrokers);
+        knownBrokers.addAll(newBrokers);
+        Collection<String> deadBrokers = 
CollectionUtils.subtract(knownBrokers, activeBrokers);
+        knownBrokers.removeAll(deadBrokers);
+        if (pulsar.getLeaderElectionService() != null
+                && pulsar.getLeaderElectionService().isLeader()) {
+            
deadBrokers.forEach(this::deleteTimeAverageDataFromMetadataStoreAsync);
+        }
+    }
+
     // As the leader broker, update the broker data map in loadData by 
querying metadata store for the broker data put
     // there by each broker via updateLocalBrokerData.
     private void updateAllBrokerData() {
@@ -1079,6 +1095,16 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager {
         }
     }
 
+    private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) {
+        final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + 
broker;
+        timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, 
ex) -> {
+            if (ex != null && !(ex.getCause() instanceof 
MetadataStoreException.NotFoundException)) {
+                log.warn("Failed to delete dead broker {} time "
+                        + "average data from metadata store", broker, ex);
+            }
+        });
+    }
+
     private void refreshBrokerToFailureDomainMap() {
         if (!pulsar.getConfiguration().isFailureDomainsEnabled()) {
             return;
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 04dbd13..22629cc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.loadbalance;
 
+import static 
org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
@@ -35,6 +36,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -51,6 +53,7 @@ import org.apache.pulsar.broker.BundleData;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.TimeAverageBrokerData;
 import org.apache.pulsar.broker.TimeAverageMessageData;
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import 
org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
@@ -602,4 +605,31 @@ public class ModularLoadManagerImplTest {
 
         pulsar.close();
     }
+
+    @Test
+    public void testRemoveDeadBrokerTimeAverageData() throws Exception {
+        ModularLoadManagerWrapper loadManagerWrapper = 
(ModularLoadManagerWrapper) pulsar1.getLoadManager().get();
+        ModularLoadManagerImpl lm = 
Whitebox.getInternalState(loadManagerWrapper, "loadManager");
+        assertEquals(lm.getAvailableBrokers().size(), 2);
+
+        pulsar2.close();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(lm.getAvailableBrokers().size(), 1);
+        });
+        lm.updateAll();
+
+        List<String> data =  pulsar1.getLocalMetadataStore()
+                
.getMetadataCache(TimeAverageBrokerData.class).getChildren(TIME_AVERAGE_BROKER_ZPATH).join();
+
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(pulsar1.getLeaderElectionService().isLeader());
+        });
+
+        assertEquals(data.size(), 1);
+
+
+
+
+    }
 }

Reply via email to