This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e27cc60eaf [fix][broker] PIP-468: Fix listener leak in 
DagWatchSession (#25650)
3e27cc60eaf is described below

commit 3e27cc60eafd01549474fb89d3a81018cd26958e
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 07:43:26 2026 -0700

    [fix][broker] PIP-468: Fix listener leak in DagWatchSession (#25650)
---
 .../broker/resources/ScalableTopicResources.java   | 132 +++++++++++-----
 .../broker/service/scalable/DagWatchSession.java   |  28 +++-
 .../ScalableTopicListenerRegistryTest.java         | 167 +++++++++++++++++++++
 .../service/scalable/DagWatchSessionTest.java      |  25 ++-
 4 files changed, 305 insertions(+), 47 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 550212a4a19..7867fb90509 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -75,13 +75,26 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     private final MetadataCache<ConsumerRegistration> 
consumerRegistrationCache;
 
     /**
-     * Per-namespace listeners for scalable topic create / modify / delete 
events.
-     * Keyed by listener so each subscriber can deregister cleanly on close. 
The map
-     * is consulted from the single store-level listener registered at 
construction
-     * time, eliminating the listener leak that would otherwise occur every 
time a
-     * watcher session ends (the metadata store API has no
-     * {@code unregisterListener}). Mirrors the {@link TopicResources}
-     * pattern for {@code TopicListener}.
+     * Per-path listeners for scalable-topic metadata events. Each listener 
watches a
+     * single exact path (typically a topic record); the resources-level 
fan-out
+     * dispatches notifications whose path equals the listener's registered 
path.
+     * Used by {@link DagWatchSession}-style subscribers that want events for 
one
+     * specific topic.
+     *
+     * <p>Hosted here — rather than letting each subscriber call
+     * {@code store.registerListener} directly — because {@code MetadataStore} 
has no
+     * {@code unregisterListener}: per-subscriber direct registration would 
leak a
+     * listener for the broker's lifetime every time a session ends, and every
+     * metadata notification would fan out to all stale listeners. Mirrors
+     * {@link TopicResources} for {@code TopicListener}.
+     */
+    private final Map<MetadataPathListener, String> pathListeners = new 
ConcurrentHashMap<>();
+
+    /**
+     * Per-namespace listeners for scalable-topic create / modify / delete 
events.
+     * Used by namespace-wide watchers (e.g. multi-topic consumer wrappers); 
the
+     * fan-out matches direct children of the listener's namespace base path. 
Same
+     * leak-avoidance rationale as {@link #pathListeners}.
      */
     private final Map<NamespaceListener, NamespaceName> namespaceListeners =
             new ConcurrentHashMap<>();
@@ -90,9 +103,9 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
         super(store, ScalableTopicMetadata.class, operationTimeoutSec);
         this.subscriptionCache = 
store.getMetadataCache(SubscriptionMetadata.class);
         this.consumerRegistrationCache = 
store.getMetadataCache(ConsumerRegistration.class);
-        // Single shared metadata-store listener fans out to every registered 
watcher.
-        // Per-watcher registration happens via registerNamespaceListener; 
close() calls
-        // deregisterNamespaceListener so closed watchers are not on the 
dispatch list.
+        // Single shared metadata-store listener fans out to both per-path and
+        // per-namespace subscribers. Per-subscriber lifecycle goes through the
+        // register / deregister methods below.
         if (store instanceof MetadataStoreExtended ext) {
             ext.registerListener(this::handleNotification);
         } else {
@@ -100,6 +113,37 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
         }
     }
 
+    // --- Per-path metadata listeners ---
+
+    /**
+     * Listener for metadata events on a specific scalable-topic-related path. 
The
+     * fan-out in {@link ScalableTopicResources} compares each notification's 
path
+     * against {@link #getMetadataPath()} and dispatches on exact match.
+     */
+    public interface MetadataPathListener {
+        /** Exact path this listener is interested in (no wildcard / prefix). 
*/
+        String getMetadataPath();
+
+        /** Called for every metadata event on the listener's path. */
+        void onNotification(Notification notification);
+    }
+
+    /**
+     * Register a per-path metadata listener. Idempotent — re-registering the 
same
+     * listener just refreshes its path mapping (e.g. if the listener moved 
its path).
+     */
+    public void registerPathListener(MetadataPathListener listener) {
+        pathListeners.put(listener, listener.getMetadataPath());
+    }
+
+    /**
+     * Deregister a previously-registered listener. Safe to call multiple 
times or for
+     * listeners that were never registered.
+     */
+    public void deregisterPathListener(MetadataPathListener listener) {
+        pathListeners.remove(listener);
+    }
+
     // --- Namespace-level scalable-topics listeners ---
 
     /**
@@ -135,35 +179,53 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     }
 
     /**
-     * Single fan-out path: for each registered listener, emit the 
notification iff
-     * its path is a direct child of the listener's namespace base path. 
Filters out
-     * subtree events (subscriptions, controller lock) up front.
+     * Single fan-out path. For each registered subscriber:
+     * <ul>
+     *   <li>Path listener: dispatch when the notification's path equals the 
listener's
+     *       registered path.</li>
+     *   <li>Namespace listener: dispatch when the notification's path is a 
direct
+     *       child of {@code /topics/<tenant>/<ns>} (skips subtree events like
+     *       subscriptions / controller lock).</li>
+     * </ul>
      */
     void handleNotification(Notification notification) {
-        if (namespaceListeners.isEmpty()) {
-            return;
-        }
         String path = notification.getPath();
-        if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
-            return;
-        }
-        for (Map.Entry<NamespaceListener, NamespaceName> entry : 
namespaceListeners.entrySet()) {
-            String basePath = namespacePath(entry.getValue());
-            if (!path.startsWith(basePath + "/")) {
-                continue;
-            }
-            // Direct child only — strip the prefix and check there's no 
further '/'.
-            String rest = path.substring(basePath.length() + 1);
-            if (rest.indexOf('/') >= 0) {
-                continue;
+
+        // Path listeners — exact match.
+        if (!pathListeners.isEmpty()) {
+            for (Map.Entry<MetadataPathListener, String> entry : 
pathListeners.entrySet()) {
+                if (entry.getValue().equals(path)) {
+                    try {
+                        entry.getKey().onNotification(notification);
+                    } catch (Exception e) {
+                        log.warn().attr("listener", entry.getKey())
+                                .attr("path", path)
+                                .exceptionMessage(e)
+                                .log("Failed to dispatch scalable-topic path 
notification");
+                    }
+                }
             }
-            try {
-                entry.getKey().onNotification(notification);
-            } catch (Exception e) {
-                log.warn().attr("listener", entry.getKey())
-                        .attr("path", path)
-                        .exceptionMessage(e)
-                        .log("Failed to dispatch scalable-topic notification");
+        }
+
+        // Namespace listeners — direct child of /topics/<ns>.
+        if (!namespaceListeners.isEmpty() && 
path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
+            for (Map.Entry<NamespaceListener, NamespaceName> entry : 
namespaceListeners.entrySet()) {
+                String basePath = namespacePath(entry.getValue());
+                if (!path.startsWith(basePath + "/")) {
+                    continue;
+                }
+                String rest = path.substring(basePath.length() + 1);
+                if (rest.indexOf('/') >= 0) {
+                    continue;
+                }
+                try {
+                    entry.getKey().onNotification(notification);
+                } catch (Exception e) {
+                    log.warn().attr("listener", entry.getKey())
+                            .attr("path", path)
+                            .exceptionMessage(e)
+                            .log("Failed to dispatch scalable-topic namespace 
notification");
+                }
             }
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
index ef0f8947b50..1e7e87865a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java
@@ -48,7 +48,7 @@ import org.apache.pulsar.metadata.api.NotificationType;
  * <p>The session is tied to a connection. When the connection breaks, the 
session dies.
  * The client must reinitiate a new session (possibly with another broker).
  */
-public class DagWatchSession {
+public class DagWatchSession implements 
ScalableTopicResources.MetadataPathListener {
 
     private static final Logger LOG = Logger.get(DagWatchSession.class);
     private final Logger log;
@@ -61,7 +61,6 @@ public class DagWatchSession {
     private final BrokerService brokerService;
 
     private final String metadataPath;
-    private final java.util.function.Consumer<Notification> 
notificationListener;
     private volatile boolean closed = false;
 
     public DagWatchSession(long sessionId,
@@ -75,17 +74,22 @@ public class DagWatchSession {
         this.resources = resources;
         this.brokerService = brokerService;
         this.metadataPath = resources.topicPath(topicName);
-        this.notificationListener = this::onNotification;
         this.log = LOG.with().attr("topic", topicName).attr("sessionId", 
sessionId).build();
     }
 
+    @Override
+    public String getMetadataPath() {
+        return metadataPath;
+    }
+
     /**
      * Start the session: load current metadata, set up watch, and return
      * the initial layout response.
      */
     public CompletableFuture<ScalableTopicLayoutResponse> start() {
-        // Register metadata store listener for changes to this topic's 
metadata
-        resources.getStore().registerListener(notificationListener);
+        // Register through the resources-level fan-out so close() can 
deregister us
+        // and we don't accumulate stale store-level listeners over time.
+        resources.registerPathListener(this);
 
         return resources.getScalableTopicMetadataAsync(topicName, true)
                 .thenCompose(optMd -> {
@@ -98,8 +102,13 @@ public class DagWatchSession {
                 });
     }
 
-    // Visible for testing — invoked by the metadata-store listener registered 
in start().
-    void onNotification(Notification notification) {
+    /**
+     * Invoked by the {@link ScalableTopicResources} fan-out for every 
metadata event
+     * matching this session's topic path. The registry already path-filtered 
for us;
+     * we re-check defensively so a registry-level bug can't cause a reload 
storm.
+     */
+    @Override
+    public void onNotification(Notification notification) {
         if (closed) {
             return;
         }
@@ -184,7 +193,10 @@ public class DagWatchSession {
 
     public void close() {
         closed = true;
-        // Listener is guarded by the closed flag; MetadataStore does not 
support unregister.
+        // Drop ourselves from the resources' fan-out so the per-event 
dispatch skips
+        // us — no listener leak, no per-notification dispatch tax across the 
broker's
+        // lifetime.
+        resources.deregisterPathListener(this);
     }
 
     /**
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
new file mode 100644
index 00000000000..9f04b7ad8c4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resources/ScalableTopicListenerRegistryTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resources;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@code MetadataPathListener} registry on
+ * {@link ScalableTopicResources}: register / deregister behaviour, exact-path
+ * dispatch, and the no-leak property the registry exists to enforce.
+ *
+ * <p>The bug it prevents: pre-registry, every {@code DagWatchSession} called
+ * {@code MetadataStore.registerListener} directly. The store has no
+ * {@code unregisterListener}, so each closed session left a stale listener
+ * registered for the broker's lifetime — both a memory leak and a
+ * per-notification dispatch tax that grew linearly with total sessions ever
+ * opened.
+ */
+public class ScalableTopicListenerRegistryTest {
+
+    private MetadataStoreExtended store;
+    private ScalableTopicResources resources;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = new LocalMemoryMetadataStore("memory:local",
+                MetadataStoreConfig.builder().build());
+        resources = new ScalableTopicResources(store, 30);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    @Test
+    public void deregisterStopsDispatchToListener() {
+        // The whole point of the registry: a deregistered listener must NOT 
receive
+        // events even if the path still matches. This is the leak fix proper —
+        // pre-registry, a closed session would keep ticking on every 
namespace event
+        // for the broker's lifetime.
+        var listener = new RecordingListener("/topics/tenant/ns/some-topic");
+        resources.registerPathListener(listener);
+        resources.handleNotification(new 
Notification(NotificationType.Modified,
+                "/topics/tenant/ns/some-topic"));
+        assertEquals(listener.received.size(), 1, "registered listener should 
see its event");
+
+        resources.deregisterPathListener(listener);
+        resources.handleNotification(new 
Notification(NotificationType.Modified,
+                "/topics/tenant/ns/some-topic"));
+        assertEquals(listener.received.size(), 1,
+                "deregistered listener must not see further events");
+    }
+
+    @Test
+    public void dispatchesOnlyOnExactPathMatch() {
+        // Path filter is exact-equal — a notification for a sibling topic 
must not
+        // wake up listeners on a different topic.
+        var watching = new RecordingListener("/topics/tenant/ns/a");
+        var bystander = new RecordingListener("/topics/tenant/ns/b");
+        resources.registerPathListener(watching);
+        resources.registerPathListener(bystander);
+
+        resources.handleNotification(new 
Notification(NotificationType.Modified,
+                "/topics/tenant/ns/a"));
+
+        assertEquals(watching.received.size(), 1);
+        assertTrue(bystander.received.isEmpty(),
+                "sibling topic listener must not receive events on /a");
+    }
+
+    @Test
+    public void multipleListenersOnSamePathAllReceive() {
+        // Two sessions can watch the same topic (rare but legal) — both must 
fire.
+        String path = "/topics/tenant/ns/shared";
+        var l1 = new RecordingListener(path);
+        var l2 = new RecordingListener(path);
+        resources.registerPathListener(l1);
+        resources.registerPathListener(l2);
+
+        resources.handleNotification(new 
Notification(NotificationType.Modified, path));
+
+        assertEquals(l1.received.size(), 1);
+        assertEquals(l2.received.size(), 1);
+    }
+
+    @Test
+    public void listenerExceptionDoesNotInterruptOtherListeners() {
+        // One bad listener throwing must not poison the dispatch loop — the 
next
+        // listener on the same path must still see the event.
+        String path = "/topics/tenant/ns/x";
+        var throwing = new ScalableTopicResources.MetadataPathListener() {
+            @Override public String getMetadataPath() {
+                return path;
+            }
+            @Override public void onNotification(Notification notification) {
+                throw new RuntimeException("boom");
+            }
+        };
+        var ok = new RecordingListener(path);
+        resources.registerPathListener(throwing);
+        resources.registerPathListener(ok);
+
+        resources.handleNotification(new 
Notification(NotificationType.Modified, path));
+
+        assertEquals(ok.received.size(), 1, "well-behaved listener should 
still see the event");
+    }
+
+    @Test
+    public void deregisterIsIdempotent() {
+        var listener = new RecordingListener("/topics/tenant/ns/a");
+        resources.registerPathListener(listener);
+        resources.deregisterPathListener(listener);
+        resources.deregisterPathListener(listener); // must not throw
+        // Deregistering one we never registered must also be silent.
+        resources.deregisterPathListener(new 
RecordingListener("/topics/tenant/ns/never"));
+    }
+
+    private static final class RecordingListener
+            implements ScalableTopicResources.MetadataPathListener {
+        final String path;
+        final List<Notification> received = new ArrayList<>();
+
+        RecordingListener(String path) {
+            this.path = path;
+        }
+
+        @Override
+        public String getMetadataPath() {
+            return path;
+        }
+
+        @Override
+        public void onNotification(Notification notification) {
+            received.add(notification);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
index 9f423413003..cf14eb6b00e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/DagWatchSessionTest.java
@@ -125,15 +125,32 @@ public class DagWatchSessionTest {
     }
 
     @Test
-    public void testStartRegistersMetadataStoreListener() {
-        // Regardless of outcome, start() should wire up a notification 
listener so that
-        // subsequent metadata changes flow into the session.
+    public void testStartRegistersWithResources() {
+        // start() routes through the resources-level fan-out instead of 
registering
+        // directly on the metadata store — that way close() can drop the
+        // registration cleanly via deregisterPathListener.
         when(resources.getScalableTopicMetadataAsync(TOPIC, true))
                 
.thenReturn(CompletableFuture.completedFuture(Optional.empty()));
 
         session.start();
 
-        verify(resources.getStore()).registerListener(any());
+        verify(resources).registerPathListener(session);
+    }
+
+    @Test
+    public void testCloseDeregistersPathListener() {
+        // The whole point of the registry pattern: close() must remove the 
listener so
+        // the per-event fan-out skips us. Otherwise we leak a stale entry per 
session
+        // for the broker's lifetime.
+        session.close();
+        verify(resources).deregisterPathListener(session);
+    }
+
+    @Test
+    public void testGetMetadataPathExposesTopicPath() {
+        // The registry uses this for its dispatch filter — must exactly match 
the
+        // path that the resources layer would generate for the topic.
+        assertEquals(session.getMetadataPath(), TOPIC_PATH);
     }
 
     // --- onNotification filtering ---

Reply via email to