merlimat opened a new pull request, #25650:
URL: https://github.com/apache/pulsar/pull/25650
## Summary
Every `DagWatchSession.start()` called
`resources.getStore().registerListener(this::onNotification)` directly.
`close()`
only flipped a closed flag because `MetadataStore` exposes no
`unregisterListener` — so each closed session left a stale listener
registered
for the broker's lifetime. Both a memory leak and a per-notification dispatch
tax that grew linearly with the total number of sessions ever opened.
The fix mirrors the `TopicResources` / `TopicListener` pattern already used
elsewhere in the codebase: register a single shared store-level listener at
construction and dispatch through a per-listener registry.
## Changes
- `ScalableTopicResources` gains a single metadata-store listener at
construction time and a `Map<MetadataPathListener, String>` registry keyed
by an exact metadata path. `registerPathListener` /
`deregisterPathListener`
manage entries; the constructor fan-out dispatches each notification to
listeners whose registered path matches. Listener exceptions are caught and
logged so one broken subscriber can't poison the dispatch loop.
- `DagWatchSession` implements `ScalableTopicResources.MetadataPathListener`.
`start()` registers via the registry, `close()` deregisters cleanly. The
per-event path-equality check stays in `onNotification` as a defensive
no-op against any future registry-level bug.
## Test plan
- New `ScalableTopicListenerRegistryTest` covers register / deregister
lifecycle, exact-path dispatch, multiple listeners on the same path, and
isolation across listeners (one throwing must not interrupt others).
- `DagWatchSessionTest`'s old `testStartRegistersMetadataStoreListener`
rewritten as `testStartRegistersWithResources` (verifies the registry call)
and `testCloseDeregistersPathListener` (asserts the leak is gone).
- Full `org.apache.pulsar.broker.service.scalable.*` and
`org.apache.pulsar.broker.resources.*` test packages pass.
### Matching PR(s) in forked repositories
- area/broker
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]