This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 908a073 ModularLoadManager should not attempt to update the load
report on ZK if we're not connected (#12191)
908a073 is described below
commit 908a07346a1f6edc2af23a0268e23ee2da40d433
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 4 15:09:14 2021 -0700
ModularLoadManager should not attempt to update the load report on ZK if
we're not connected (#12191)
---
.../loadbalance/impl/ModularLoadManagerImpl.java | 22 ++++++++++++++++------
.../loadbalance/ModularLoadManagerImplTest.java | 2 +-
2 files changed, 17 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
index 7522df7..81d531f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java
@@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.BrokerData;
@@ -81,13 +80,14 @@ import
org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ModularLoadManagerImpl implements ModularLoadManager,
Consumer<Notification> {
+public class ModularLoadManagerImpl implements ModularLoadManager {
private static final Logger log =
LoggerFactory.getLogger(ModularLoadManagerImpl.class);
// Path to ZNode whose children contain BundleData jsons for each bundle
(new API version of ResourceQuota).
@@ -182,6 +182,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, Consumer<Noti
private Map<String, String> brokerToFailureDomainMap;
+ private SessionEvent lastMetadataSessionEvent = SessionEvent.Reconnected;
+
// record load balancing metrics
private AtomicReference<List<Metrics>> loadBalancingMetrics = new
AtomicReference<>();
// record bundle unload metrics
@@ -239,7 +241,8 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, Consumer<Noti
bundlesCache =
pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
resourceQuotaCache =
pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
timeAverageBrokerDataCache =
pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
- pulsar.getLocalMetadataStore().registerListener(this);
+
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
+
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
if (SystemUtils.IS_OS_LINUX) {
brokerHostUsage = new LinuxBrokerHostUsageImpl(pulsar);
@@ -271,8 +274,7 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, Consumer<Noti
loadSheddingPipeline.add(createLoadSheddingStrategy());
}
- @Override
- public void accept(Notification t) {
+ public void handleDataNotification(Notification t) {
if (t.getPath().startsWith(LoadManager.LOADBALANCE_BROKERS_ROOT)) {
brokersData.listLocks(LoadManager.LOADBALANCE_BROKERS_ROOT)
.thenAccept(brokers -> {
@@ -287,6 +289,10 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, Consumer<Noti
}
}
+ private void handleMetadataSessionEvent(SessionEvent e) {
+ lastMetadataSessionEvent = e;
+ }
+
private LoadSheddingStrategy createLoadSheddingStrategy() {
try {
Class<?> loadSheddingClass =
Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
@@ -956,7 +962,11 @@ public class ModularLoadManagerImpl implements
ModularLoadManager, Consumer<Noti
lock.lock();
try {
updateLocalBrokerData();
- if (needBrokerDataUpdate() || force) {
+
+ // Do not attempt to write if not connected
+ if (lastMetadataSessionEvent != null
+ && lastMetadataSessionEvent.isConnected()
+ && (needBrokerDataUpdate() || force)) {
localData.setLastUpdate(System.currentTimeMillis());
brokerDataLock.updateValue(localData).join();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index 62b6a52..d09158e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -333,7 +333,7 @@ public class ModularLoadManagerImplTest {
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
- primaryLoadManager.accept(new Notification(NotificationType.Created,
LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));
+ primaryLoadManager.handleDataNotification(new
Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT +
"/broker:8080"));
Thread.sleep(100);
localBrokerData.setCpu(new ResourceUsage(80, 100));