This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c1dab5d15 [fix][broker] ExtensibleLoadManager: handle 
SessionReestablished and Reconnected events to re-register broker metadata 
(#24932)
95c1dab5d15 is described below

commit 95c1dab5d15bc5cc4ba908f599c77ded892c34b2
Author: Dream95 <[email protected]>
AuthorDate: Mon Nov 3 21:00:31 2025 +0800

    [fix][broker] ExtensibleLoadManager: handle SessionReestablished and 
Reconnected events to re-register broker metadata (#24932)
    
    Signed-off-by: Dream95 <[email protected]>
---
 .../broker/loadbalance/extensions/BrokerRegistryImpl.java  | 14 ++++++++++++++
 .../apache/pulsar/broker/service/ZkSessionExpireTest.java  |  4 ++--
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 115bbe56ffa..58559131af8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 
 /**
  * The broker registry impl, base on the LockManager.
@@ -112,6 +113,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
             throw new PulsarServerException("Cannot start the broker registry 
in state " + state.get());
         }
         
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
+        
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
         try {
             
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
         } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
@@ -282,6 +284,18 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         }
     }
 
+    private void handleMetadataSessionEvent(SessionEvent event) {
+        if (!this.isStarted()) {
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Handle metadata session event: [{}]", event);
+        }
+        if (event == SessionEvent.SessionReestablished || event == 
SessionEvent.Reconnected) {
+            this.registerAsyncWithRetries();
+        }
+    }
+
     @VisibleForTesting
     protected static boolean isVerifiedNotification(Notification t) {
        return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/")
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
index f9632625e68..e19fd09d062 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java
@@ -202,11 +202,11 @@ public class ZkSessionExpireTest extends 
NetworkErrorTestBase {
         metadataZKProxy.unRejectAllConnections();
         Awaitility.await().untilAsserted(() -> {
             Set<String> availableBrokers1 = getAvailableBrokers(pulsar1);
-            Set<String> availableBrokers2 = getAvailableBrokers(pulsar1);
+            Set<String> availableBrokers2 = getAvailableBrokers(pulsar2);
             log.info("Available brokers 1: {}", availableBrokers1);
             log.info("Available brokers 2: {}", availableBrokers2);
             assertEquals(availableBrokers1.size(), 2);
-            assertEquals(availableBrokers1.size(), 2);
+            assertEquals(availableBrokers2.size(), 2);
         });
 
         // Verify: the topic on broker-1 will be unloaded.

Reply via email to