This is an automated email from the ASF dual-hosted git repository.

daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c615c494d [fix][broker] Fix race condition in 
MetadataStoreCacheLoader causing inconsistent availableBroker list caching 
(#24639)
d3c615c494d is described below

commit d3c615c494ddd3d2c0ba6005951e486d7b76e7f3
Author: zzb <48124861+zhaizh...@users.noreply.github.com>
AuthorDate: Thu Aug 21 13:01:11 2025 +0800

    [fix][broker] Fix race condition in MetadataStoreCacheLoader causing 
inconsistent availableBroker list caching (#24639)
    
    Co-authored-by: zhaizhibo <zhaizh...@kuaishou.com>
---
 .../broker/resources/MetadataStoreCacheLoader.java | 35 +++++++++++++---------
 1 file changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
index 43376f40550..29451148da4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/MetadataStoreCacheLoader.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +45,7 @@ public class MetadataStoreCacheLoader implements Closeable {
     private final int operationTimeoutMs;
 
     private volatile List<LoadManagerReport> availableBrokers;
+    private final FutureUtil.Sequencer<Void> sequencer;
 
     private final OrderedScheduler orderedExecutor = 
OrderedScheduler.newSchedulerBuilder().numThreads(8)
             .name("pulsar-metadata-cache-loader-ordered-cache").build();
@@ -52,6 +55,7 @@ public class MetadataStoreCacheLoader implements Closeable {
     public MetadataStoreCacheLoader(PulsarResources pulsarResources, int 
operationTimeoutMs) throws Exception {
         this.loadReportResources = pulsarResources.getLoadReportResources();
         this.operationTimeoutMs = operationTimeoutMs;
+        this.sequencer = FutureUtil.Sequencer.create();
         init();
     }
 
@@ -61,26 +65,29 @@ public class MetadataStoreCacheLoader implements Closeable {
      * @throws Exception
      */
     public void init() throws Exception {
-        loadReportResources.getStore().registerListener((n) -> {
-            if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && 
NotificationType.ChildrenChanged.equals(n.getType())) {
-                
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT).thenApplyAsync((brokerNodes)->{
-                    updateBrokerList(brokerNodes).thenRun(() -> {
-                        log.info("Successfully updated broker info {}", 
brokerNodes);
-                    }).exceptionally(ex -> {
+        Supplier<CompletableFuture<Void>> tryUpdate = () -> {
+            return 
loadReportResources.getChildrenAsync(LOADBALANCE_BROKERS_ROOT)
+                    .thenComposeAsync(brokerNodes -> {
+                        return updateBrokerList(brokerNodes).thenRun(() -> {
+                            log.info("Successfully updated broker info {}", 
brokerNodes);
+                        });
+                    })
+                    .exceptionally(ex -> {
                         log.warn("Error updating broker info after broker list 
changed", ex);
                         return null;
                     });
-                    return null;
-                }).exceptionally(ex -> {
-                    log.warn("Error updating broker info after broker list 
changed", ex);
-                    return null;
-                });
+        };
+        loadReportResources.getStore().registerListener((n) -> {
+            if (LOADBALANCE_BROKERS_ROOT.equals(n.getPath()) && 
NotificationType.ChildrenChanged.equals(n.getType())) {
+                sequencer.sequential(tryUpdate);
             }
         });
-
+        if (loadReportResources.getStore() instanceof MetadataStoreExtended) {
+            ((MetadataStoreExtended) 
loadReportResources.getStore()).registerSessionListener(sessionEvent ->
+                    sequencer.sequential(tryUpdate));
+        }
         // Do initial fetch of brokers list
-        
updateBrokerList(loadReportResources.getChildren(LOADBALANCE_BROKERS_ROOT)).get(operationTimeoutMs,
-                TimeUnit.SECONDS);
+        tryUpdate.get().get(operationTimeoutMs, TimeUnit.MILLISECONDS);
     }
 
     public List<LoadManagerReport> getAvailableBrokers() {

Reply via email to