This is an automated email from the ASF dual-hosted git repository. daojun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d3c615c494d [fix][broker] Fix race condition in MetadataStoreCacheLoader causing inconsistent availableBroker list caching (#24639) d3c615c494d is described below commit d3c615c494ddd3d2c0ba6005951e486d7b76e7f3 Author: zzb <48124861+zhaizh...@users.noreply.github.com> AuthorDate: Thu Aug 21 13:01:11 2025 +0800 [fix][broker] Fix race condition in MetadataStoreCacheLoader causing inconsistent availableBroker list caching (#24639) Co-authored-by: zhaizhibo <zhaizh...@kuaishou.com> --- .../broker/resources/MetadataStoreCacheLoader.java | 35 +++++++++++++--------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java index 43376f40550..29451148da4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java @@ -25,10 +25,12 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.NotificationType; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,7 @@ public class MetadataStoreCacheLoader implements Closeable { private final int operationTimeoutMs; private volatile List<LoadManagerReport> availableBrokers; + private final FutureUtil.Sequencer<Void> sequencer; private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder().numThreads(8) .name("pulsar-metadata-cache-loader-ordered-cache").build(); @@ -52,6 +55,7 @@ public class MetadataStoreCacheLoader implements Closeable { public MetadataStoreCacheLoader(PulsarResources pulsarResources, int operationTimeoutMs) throws Exception { this.loadReportResources = pulsarResources.getLoadReportResources(); this.operationTimeoutMs = operationTimeoutMs; + this.sequencer = FutureUtil.Sequencer.create(); init(); } @@ -61,26 +65,29 @@ public class MetadataStoreCacheLoader implements Closeable { * @throws Exception */ public void init() throws Exception { - loadReportResources.getStore().registerListener((n) -> { - if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) { - loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{ - updateBrokerList(brokerNodes).thenRun(() -> { - log.info("Successfully updated broker info {}", brokerNodes); - }).exceptionally(ex -> { + Supplier<CompletableFuture<Void>> tryUpdate = () -> { + return loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT) + .thenComposeAsync(brokerNodes -> { + return updateBrokerList(brokerNodes).thenRun(() -> { + log.info("Successfully updated broker info {}", brokerNodes); + }); + }) + .exceptionally(ex -> { log.warn("Error updating broker info after broker list changed", ex); return null; }); - return null; - }).exceptionally(ex -> { - log.warn("Error updating broker info after broker list changed", ex); - return null; - }); + }; + loadReportResources.getStore().registerListener((n) -> { + if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && NotificationType.ChildrenChanged.equals(n.getType())) { + sequencer.sequential(tryUpdate); } }); - + if (loadReportResources.getStore() instanceof MetadataStoreExtended) { + ((MetadataStoreExtended) loadReportResources.getStore()).registerSessionListener(sessionEvent -> + sequencer.sequential(tryUpdate)); + } // Do initial fetch of brokers list - updateBrokerList(loadReportResources.getChildren(LOADBALANCE_BROKERS_ROOT)).get(operationTimeoutMs, - TimeUnit.SECONDS); + tryUpdate.get().get(operationTimeoutMs, TimeUnit.MILLISECONDS); } public List<LoadManagerReport> getAvailableBrokers() {