This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 908a073  ModularLoadManager should not attempt to update the load 
report on ZK if we're not connected (#12191)
908a073 is described below

commit 908a07346a1f6edc2af23a0268e23ee2da40d433
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 4 15:09:14 2021 -0700

    ModularLoadManager should not attempt to update the load report on ZK if 
we're not connected (#12191)
---
 .../loadbalance/impl/ModularLoadManagerImpl.java   | 22 ++++++++++++++++------
 .../loadbalance/ModularLoadManagerImplTest.java    |  2 +-
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7522df7..81d531f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.pulsar.broker.BrokerData;
@@ -81,13 +80,14 @@ import 
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.coordination.LockManager;
 import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
 import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ModularLoadManagerImpl implements ModularLoadManager, 
Consumer<Notification> {
+public class ModularLoadManagerImpl implements ModularLoadManager {
     private static final Logger log = 
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
 
     // Path to ZNode whose children contain BundleData jsons for each bundle 
(new API version of ResourceQuota).
@@ -182,6 +182,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, Consumer<Noti
 
     private Map<String, String> brokerToFailureDomainMap;
 
+    private SessionEvent lastMetadataSessionEvent = SessionEvent.Reconnected;
+
     // record load balancing metrics
     private AtomicReference<List<Metrics>> loadBalancingMetrics = new 
AtomicReference<>();
     // record bundle unload metrics
@@ -239,7 +241,8 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, Consumer<Noti
         bundlesCache = 
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
         resourceQuotaCache = 
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
         timeAverageBrokerDataCache = 
pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
-        pulsar.getLocalMetadataStore().registerListener(this);
+        
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
+        
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
 
         if (SystemUtils.IS_OS_LINUX) {
             brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
@@ -271,8 +274,7 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, Consumer<Noti
         loadSheddingPipeline.add(createLoadSheddingStrategy());
     }
 
-    @Override
-    public void accept(Notification t) {
+    public void handleDataNotification(Notification t) {
         if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
             brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
                     .thenAccept(brokers -> {
@@ -287,6 +289,10 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, Consumer<Noti
         }
     }
 
+    private void handleMetadataSessionEvent(SessionEvent e) {
+        lastMetadataSessionEvent = e;
+    }
+
     private LoadSheddingStrategy createLoadSheddingStrategy() {
         try {
             Class<?> loadSheddingClass = 
Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
@@ -956,7 +962,11 @@ public class ModularLoadManagerImpl implements 
ModularLoadManager, Consumer<Noti
         lock.lock();
         try {
             updateLocalBrokerData();
-            if (needBrokerDataUpdate() || force) {
+
+            // Do not attempt to write if not connected
+            if (lastMetadataSessionEvent != null
+                    && lastMetadataSessionEvent.isConnected()
+                    && (needBrokerDataUpdate() || force)) {
                 localData.setLastUpdate(System.currentTimeMillis());
 
                 brokerDataLock.updateValue(localData).join();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 62b6a52..d09158e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -333,7 +333,7 @@ public class ModularLoadManagerImplTest {
         when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
         brokerDataMap.put(primaryHost, brokerDataSpy1);
         // Need to update all the bundle data for the shredder to see the spy.
-        primaryLoadManager.accept(new Notification(NotificationType.Created, 
LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
+        primaryLoadManager.handleDataNotification(new 
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + 
"/broker:8080"));
 
         Thread.sleep(100);
         localBrokerData.setCpu(new ResourceUsage(80, 100));

Reply via email to