This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 38336f75dc08ef6706d839cf914e67dffaaab028 Author: Dream95 <[email protected]> AuthorDate: Mon Nov 3 21:00:31 2025 +0800 [fix][broker] ExtensibleLoadManager: handle SessionReestablished and Reconnected events to re-register broker metadata (#24932) Signed-off-by: Dream95 <[email protected]> (cherry picked from commit 95c1dab5d15bc5cc4ba908f599c77ded892c34b2) --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 0f16cbba71c..9ae8c61b81c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -45,6 +45,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.Notification; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.SessionEvent; /** * The broker registry impl, base on the LockManager. @@ -111,6 +112,7 @@ public class BrokerRegistryImpl implements BrokerRegistry { throw new PulsarServerException("Cannot start the broker registry in state " + state.get()); } pulsar.getLocalMetadataStore().registerListener(this::handleMetadataStoreNotification); + pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); try { this.registerAsync().get(conf.getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException | TimeoutException e) { @@ -281,6 +283,18 @@ public class BrokerRegistryImpl implements BrokerRegistry { } } + private void handleMetadataSessionEvent(SessionEvent event) { + if (!this.isStarted()) { + return; + } + if (log.isDebugEnabled()) { + log.debug("Handle metadata session event: [{}]", event); + } + if (event == SessionEvent.SessionReestablished || event == SessionEvent.Reconnected) { + this.registerAsyncWithRetries(); + } + } + @VisibleForTesting protected static boolean isVerifiedNotification(Notification t) { return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/")
