lhotari commented on PR #25648: URL: https://github.com/apache/pulsar/pull/25648#issuecomment-4360787352
> _Note: this is analysis by Claude Code, reviewed by me before posting._ A few points worth checking before this merges. **1. Metadata listener leak per `ScalableTopicsWatcherSession`** (`ScalableTopicsWatcherSession.start():305`, `close():466-472`) `start()` calls `resources.getStore().registerListener(this::onNotification)` per session, and `close()` only flips a `closed` flag — `MetadataStore` exposes no `unregisterListener`. So every closed session leaves a stale `Consumer<Notification>` registered, and **every** metadata notification fans out to all stale listeners over the broker's lifetime (each short-circuits on `closed.get()`, but the dispatch cost still scales linearly with total sessions ever opened). Not just memory growth — a real per-event throughput tax for long-running brokers serving many namespace watches. **Suggested fix (primary)** — mirror what `TopicResources` does for `TopicListService`: - Have `ScalableTopicResources` register **one** `handleNotification` listener at construction time (`TopicResources.java:52`). - Maintain a `Map<ScalableTopicNamespaceListener, NamespaceName>` (or similar) inside `ScalableTopicResources`, with `register…Listener(...)` / `deregister…Listener(...)` methods (`TopicResources.java:136-142`). - `ScalableTopicsWatcherSession.start()` calls `register…`, `close()` calls `deregister…`. The single fan-out filters by the watcher's namespace base path. This is the established pattern in the codebase, and it removes both the leak and the linear dispatch cost without needing a `MetadataStore` API change. **Alternative** — #24256 adds `registerCancellableListener` returning a handle that callers cancel on shutdown. With that landed, `ScalableTopicsWatcherSession` could store the handle and cancel it in `close()`. Works, but keeps one metadata-store listener per session; the fan-out approach above is cheaper at runtime and consistent with `TopicListService`. **2. \`consumer_name\` described in PR body but absent from the proto** (`PulsarApi.proto:1558-1569`) The PR description's "Wire protocol" section states `CommandWatchScalableTopics` carries `consumer_name`, and the "Cross-topic load balancing — deferred" section says "\`consumer_name\` is part of \`CommandWatchScalableTopics\` *now*, so the future coordinator has the identity it needs." The proto in this PR has only `watch_id`, `namespace`, `property_filters`, `current_hash`. Either add the field now (cheap — `optional string consumer_name = 5;`) or remove the design language so we don't need another wire bump for the future coordinator. **3. Reconnect backoff never reset on hash-matched reconnect** (`ScalableTopicsWatcher.java:1037`) `reconnectBackoff.reset()` runs only inside `onSnapshot`. The hash-skip optimisation is the common short-blip happy path — the broker emits no `Snapshot`, so the backoff stays at its last value across successive successful reconnects. The next disconnect then waits much longer than expected. Suggest resetting on successful reconnect (after the watch frame's write completes, or on any successful broker→client traffic). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
