This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 38336f75dc08ef6706d839cf914e67dffaaab028
Author: Dream95 <[email protected]>
AuthorDate: Mon Nov 3 21:00:31 2025 +0800

    [fix][broker] ExtensibleLoadManager: handle SessionReestablished and 
Reconnected events to re-register broker metadata (#24932)
    
    Signed-off-by: Dream95 <[email protected]>
    (cherry picked from commit 95c1dab5d15bc5cc4ba908f599c77ded892c34b2)
---
 .../broker/loadbalance/extensions/BrokerRegistryImpl.java  | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
index 0f16cbba71c..9ae8c61b81c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.NotificationType;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
 
 /**
  * The broker registry impl, base on the LockManager.
@@ -111,6 +112,7 @@ public class BrokerRegistryImpl implements BrokerRegistry {
             throw new PulsarServerException("Cannot start the broker registry 
in state " + state.get());
         }
         
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification);
+        
pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent);
         try {
             
this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), 
TimeUnit.SECONDS);
         } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
@@ -281,6 +283,18 @@ public class BrokerRegistryImpl implements BrokerRegistry {
         }
     }
 
+    private void handleMetadataSessionEvent(SessionEvent event) {
+        if (!this.isStarted()) {
+            return;
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Handle metadata session event: [{}]", event);
+        }
+        if (event == SessionEvent.SessionReestablished || event == 
SessionEvent.Reconnected) {
+            this.registerAsyncWithRetries();
+        }
+    }
+
     @VisibleForTesting
     protected static boolean isVerifiedNotification(Notification t) {
        return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/")

Reply via email to