This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e557f15da [feat] PIP-468: Namespace scalable-topics watcher 
(protocol + broker + V5 client) (#25648)
d0e557f15da is described below

commit d0e557f15da844b369b876dc0f5c98e4586f3164
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 15:18:20 2026 -0700

    [feat] PIP-468: Namespace scalable-topics watcher (protocol + broker + V5 
client) (#25648)
---
 .../broker/resources/ScalableTopicResources.java   | 100 ++++++
 .../apache/pulsar/broker/service/ServerCnx.java    | 118 +++++++
 .../scalable/ScalableTopicsWatcherSession.java     | 315 +++++++++++++++++++
 .../ScalableTopicsWatcherSessionHashTest.java      | 147 +++++++++
 .../client/api/v5/V5ScalableTopicsWatcherTest.java | 234 ++++++++++++++
 .../client/impl/v5/ScalableTopicsWatcher.java      | 348 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  83 +++++
 .../client/impl/ScalableTopicsWatcherSession.java  |  60 ++++
 .../apache/pulsar/common/protocol/Commands.java    |  89 ++++++
 .../pulsar/common/protocol/PulsarDecoder.java      |  32 ++
 pulsar-common/src/main/proto/PulsarApi.proto       |  59 ++++
 11 files changed, 1585 insertions(+)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 3e015808476..550212a4a19 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import lombok.CustomLog;
@@ -35,6 +36,8 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 
 /**
  * Metadata store access for scalable topic metadata.
@@ -71,10 +74,98 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
     private final MetadataCache<SubscriptionMetadata> subscriptionCache;
     private final MetadataCache<ConsumerRegistration> 
consumerRegistrationCache;
 
+    /**
+     * Per-namespace listeners for scalable topic create / modify / delete 
events.
+     * Keyed by listener so each subscriber can deregister cleanly on close. 
The map
+     * is consulted from the single store-level listener registered at 
construction
+     * time, eliminating the listener leak that would otherwise occur every 
time a
+     * watcher session ends (the metadata store API has no
+     * {@code unregisterListener}). Mirrors the {@link TopicResources}
+     * pattern for {@code TopicListener}.
+     */
+    private final Map<NamespaceListener, NamespaceName> namespaceListeners =
+            new ConcurrentHashMap<>();
+
     public ScalableTopicResources(MetadataStore store, int 
operationTimeoutSec) {
         super(store, ScalableTopicMetadata.class, operationTimeoutSec);
         this.subscriptionCache = 
store.getMetadataCache(SubscriptionMetadata.class);
         this.consumerRegistrationCache = 
store.getMetadataCache(ConsumerRegistration.class);
+        // Single shared metadata-store listener fans out to every registered 
watcher.
+        // Per-watcher registration happens via registerNamespaceListener; 
close() calls
+        // deregisterNamespaceListener so closed watchers are not on the 
dispatch list.
+        if (store instanceof MetadataStoreExtended ext) {
+            ext.registerListener(this::handleNotification);
+        } else {
+            store.registerListener(this::handleNotification);
+        }
+    }
+
+    // --- Namespace-level scalable-topics listeners ---
+
+    /**
+     * Listener for scalable-topic create / modify / delete events under a 
single
+     * namespace. The fan-out in {@link ScalableTopicResources} filters 
notifications
+     * to the listener's namespace and to direct topic records (skipping 
subtree paths
+     * like {@code <topic>/subscriptions/...} or {@code <topic>/controller}).
+     */
+    public interface NamespaceListener {
+        /** Namespace this listener is scoped to. */
+        NamespaceName getNamespaceName();
+
+        /** Called for every metadata event affecting a topic record in the 
namespace. */
+        void onNotification(Notification notification);
+    }
+
+    /**
+     * Register a per-namespace listener. The listener will receive every
+     * Created / Modified / Deleted event whose path is a direct child of
+     * {@code /topics/<tenant>/<ns>}. Idempotent — re-registering the same 
listener
+     * just updates the namespace mapping.
+     */
+    public void registerNamespaceListener(NamespaceListener listener) {
+        namespaceListeners.put(listener, listener.getNamespaceName());
+    }
+
+    /**
+     * Deregister a previously-registered namespace listener. Safe to call 
multiple
+     * times or for listeners that were never registered.
+     */
+    public void deregisterNamespaceListener(NamespaceListener listener) {
+        namespaceListeners.remove(listener);
+    }
+
+    /**
+     * Single fan-out path: for each registered listener, emit the 
notification iff
+     * its path is a direct child of the listener's namespace base path. 
Filters out
+     * subtree events (subscriptions, controller lock) up front.
+     */
+    void handleNotification(Notification notification) {
+        if (namespaceListeners.isEmpty()) {
+            return;
+        }
+        String path = notification.getPath();
+        if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
+            return;
+        }
+        for (Map.Entry<NamespaceListener, NamespaceName> entry : 
namespaceListeners.entrySet()) {
+            String basePath = namespacePath(entry.getValue());
+            if (!path.startsWith(basePath + "/")) {
+                continue;
+            }
+            // Direct child only — strip the prefix and check there's no 
further '/'.
+            String rest = path.substring(basePath.length() + 1);
+            if (rest.indexOf('/') >= 0) {
+                continue;
+            }
+            try {
+                entry.getKey().onNotification(notification);
+            } catch (Exception e) {
+                log.warn().attr("listener", entry.getKey())
+                        .attr("path", path)
+                        .exceptionMessage(e)
+                        .log("Failed to dispatch scalable-topic notification");
+            }
+        }
     }
 
     public CompletableFuture<Void> createScalableTopicAsync(TopicName tn, 
ScalableTopicMetadata metadata) {
@@ -279,6 +370,15 @@ public class ScalableTopicResources extends 
BaseResources<ScalableTopicMetadata>
         return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(), 
tn.getEncodedLocalName());
     }
 
+    /**
+     * Path under which all scalable topic records for a namespace live as 
direct
+     * children. Used by namespace-wide watchers as the prefix to filter 
metadata
+     * notifications down to events that touch a topic record.
+     */
+    public String namespacePath(NamespaceName ns) {
+        return joinPath(SCALABLE_TOPIC_PATH, ns.toString());
+    }
+
     public String subscriptionPath(TopicName tn, String subscription) {
         return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription);
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2ed913561b2..109cf1b489b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -486,6 +486,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
         dagWatchSessions.clear();
 
+        // Same for namespace-wide scalable-topic watchers.
+        scalableTopicsWatchers.values().forEach(session -> {
+            try {
+                session.close();
+            } catch (Exception e) {
+                log.warn().exceptionMessage(e)
+                        .log("Error closing scalable-topics watcher on 
connection close");
+            }
+        });
+        scalableTopicsWatchers.clear();
+
         // Notify the scalable-topic controller that this connection's 
scalable consumers
         // have dropped. The controller marks them disconnected and starts the 
grace-period
         // timer; if they reconnect in time, their assignment is preserved.
@@ -832,6 +843,113 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                 });
     }
 
+    // --- Scalable topics namespace watcher ---
+
+    private final java.util.concurrent.ConcurrentHashMap<Long,
+            
org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession>
+            scalableTopicsWatchers = new 
java.util.concurrent.ConcurrentHashMap<>();
+
+    @Override
+    protected void handleCommandWatchScalableTopics(
+            org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd) 
{
+        checkArgument(state == State.Connected);
+
+        final long watchId = cmd.getWatchId();
+        final String namespaceStr = cmd.getNamespace();
+        log.debug().attr("namespace", namespaceStr).attr("watchId", watchId)
+                .log("Received WatchScalableTopics");
+
+        if (!scalableTopicsEnabled) {
+            ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                    ServerError.NotAllowedError, "Scalable topics are disabled 
on this broker"));
+            return;
+        }
+
+        final NamespaceName namespaceName;
+        try {
+            namespaceName = NamespaceName.get(namespaceStr);
+        } catch (Exception e) {
+            log.warn().attr("namespace", namespaceStr).log("Invalid namespace 
in WatchScalableTopics");
+            ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                    ServerError.InvalidTopicName, "Invalid namespace: " + 
namespaceStr));
+            return;
+        }
+
+        final java.util.Map<String, String> propertyFilters = new 
java.util.HashMap<>();
+        for (int i = 0; i < cmd.getPropertyFiltersCount(); i++) {
+            var kv = cmd.getPropertyFilterAt(i);
+            propertyFilters.put(kv.getKey(), kv.getValue());
+        }
+        final String clientHash = cmd.hasCurrentHash() ? cmd.getCurrentHash() 
: null;
+
+        if (!this.service.getPulsar().isRunning()) {
+            log.warn("WatchScalableTopics rejected: broker not ready");
+            ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                    ServerError.ServiceNotReady, "Broker not ready"));
+            return;
+        }
+
+        org.apache.pulsar.broker.resources.ScalableTopicResources resources =
+                
service.getPulsar().getPulsarResources().getScalableTopicResources();
+        if (resources == null) {
+            log.warn("WatchScalableTopics rejected: scalable topic resources 
not available");
+            ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                    ServerError.ServiceNotReady, "Scalable topic resources not 
available"));
+            return;
+        }
+
+        isNamespaceOperationAllowed(namespaceName, 
NamespaceOperation.GET_TOPICS)
+                .thenAccept(isAuthorized -> {
+                    if (!isAuthorized) {
+                        final String msg = "Client is not authorized to 
WatchScalableTopics";
+                        log.warn().attr("principal", 
getPrincipal()).attr("namespace", namespaceName)
+                                .log(msg);
+                        
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                                ServerError.AuthorizationError, msg));
+                        return;
+                    }
+                    var session = new org.apache.pulsar.broker.service.scalable
+                            .ScalableTopicsWatcherSession(watchId, 
namespaceName, propertyFilters,
+                                    clientHash, this, resources, 
service.getPulsar().getExecutor());
+                    scalableTopicsWatchers.put(watchId, session);
+
+                    session.start().exceptionally(ex -> {
+                        Throwable cause = ex.getCause() != null ? 
ex.getCause() : ex;
+                        log.warn().attr("namespace", 
namespaceName).exception(cause)
+                                .log("WatchScalableTopics failed");
+                        scalableTopicsWatchers.remove(watchId);
+                        session.close();
+                        ctx.executor().execute(() -> ctx.writeAndFlush(
+                                Commands.newWatchScalableTopicsError(watchId,
+                                        ServerError.UnknownError, 
cause.getMessage())));
+                        return null;
+                    });
+                })
+                .exceptionally(ex -> {
+                    logNamespaceNameAuthException(remoteAddress, 
"watch-scalable-topics",
+                            getPrincipal(), Optional.of(namespaceName), ex);
+                    
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+                            ServerError.AuthorizationError,
+                            "Exception occurred while authorizing 
WatchScalableTopics"));
+                    return null;
+                });
+    }
+
+    @Override
+    protected void handleCommandWatchScalableTopicsClose(
+            org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose 
cmd) {
+        // Same idempotent-close semantics as DAG watch / consumer close: 
per-cnx
+        // session, originating subscribe was authorized at create time, no 
per-call
+        // authz needed. Unknown watchId is a no-op.
+        checkArgument(state == State.Connected);
+        long watchId = cmd.getWatchId();
+        log.debug().attr("watchId", watchId).log("Received 
WatchScalableTopicsClose");
+        var session = scalableTopicsWatchers.remove(watchId);
+        if (session != null) {
+            session.close();
+        }
+    }
+
     @Override
     protected void handleCommandScalableTopicClose(
             CommandScalableTopicClose commandScalableTopicClose) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
new file mode 100644
index 00000000000..8598170d969
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Broker-side handler for a multi-topic consumer's namespace watch session.
+ *
+ * <p>Watches the metadata store for scalable-topic create / modify / delete 
events
+ * under a given namespace, evaluates them against a (possibly empty) set of 
property
+ * filters, and pushes the matching set to the client. The client receives:
+ * <ul>
+ *   <li>One {@code Snapshot} on subscribe (and on every 
reconnect-resync).</li>
+ *   <li>One {@code Diff} per coalescing window when membership changes.</li>
+ * </ul>
+ *
+ * <p>Tied to a connection — drop the session when the channel goes away. The 
client
+ * re-opens a fresh watch on reconnect; the new session emits a fresh snapshot 
and
+ * the client reconciles locally.
+ *
+ * <p>Any broker can serve this role: every broker observes the same metadata 
events
+ * via the registered listener, so no coordinator is needed at the namespace 
level.
+ */
+public class ScalableTopicsWatcherSession implements 
ScalableTopicResources.NamespaceListener {
+
+    private static final Logger LOG = 
Logger.get(ScalableTopicsWatcherSession.class);
+    /**
+     * Window over which back-to-back metadata events are batched into one 
Diff frame.
+     * Small enough to feel "live", large enough to amortise rapid bursts 
(e.g. test
+     * setups that create N topics in a tight loop).
+     */
+    private static final Duration COALESCE_WINDOW = Duration.ofMillis(50);
+
+    private final Logger log;
+
+    @Getter
+    private final long watchId;
+    private final NamespaceName namespace;
+    private final Map<String, String> propertyFilters;
+    /**
+     * Hash of the topic set the client believes it has. Optional: present on
+     * reconnect, absent on first subscribe. When equal to the freshly-computed
+     * hash on the broker, {@link #start()} skips emitting the initial 
snapshot —
+     * the client's state is already correct.
+     */
+    private final String clientHash;
+    private final ServerCnx cnx;
+    private final ScalableTopicResources resources;
+    private final ScheduledExecutorService scheduler;
+
+    /** {@code /topics/<tenant>/<namespace>} — direct children are scalable 
topic records. */
+    private final String basePath;
+
+    /**
+     * Topics currently in the matching set. Maintained server-side so we can 
detect
+     * actual membership flips on Modified events (filter changes a topic 
in/out).
+     * Topic names are fully-qualified ({@code topic://tenant/ns/name}).
+     */
+    private final Set<String> currentSet = Collections.synchronizedSet(new 
HashSet<>());
+
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final AtomicBoolean snapshotEmitted = new AtomicBoolean();
+
+    // --- Coalescing state. All three fields guarded by `coalesceLock`. ---
+    private final Object coalesceLock = new Object();
+    private final LinkedHashSet<String> pendingAdded = new LinkedHashSet<>();
+    private final LinkedHashSet<String> pendingRemoved = new LinkedHashSet<>();
+    private boolean flushScheduled = false;
+
+    public ScalableTopicsWatcherSession(long watchId,
+                                         NamespaceName namespace,
+                                         Map<String, String> propertyFilters,
+                                         String clientHash,
+                                         ServerCnx cnx,
+                                         ScalableTopicResources resources,
+                                         ScheduledExecutorService scheduler) {
+        this.watchId = watchId;
+        this.namespace = namespace;
+        this.propertyFilters = propertyFilters == null ? Map.of() : 
propertyFilters;
+        this.clientHash = clientHash;
+        this.cnx = cnx;
+        this.resources = resources;
+        this.scheduler = scheduler;
+        this.basePath = resources.namespacePath(namespace);
+        this.log = LOG.with()
+                .attr("namespace", namespace)
+                .attr("watchId", watchId)
+                .attr("filters", this.propertyFilters)
+                .build();
+    }
+
+    @Override
+    public NamespaceName getNamespaceName() {
+        return namespace;
+    }
+
+    /**
+     * Start the watch: register on the namespace listener registry first (so 
events
+     * during snapshot computation are queued, not lost), compute the initial 
filtered
+     * set, then emit the {@code Snapshot} frame. After that, deltas flow 
through the
+     * listener.
+     */
+    public CompletableFuture<Void> start() {
+        // Register BEFORE computing the initial set: any event that arrives 
mid-snapshot
+        // is captured by the listener and either (a) already in the initial 
set we're
+        // about to emit, in which case the redundant Add is a no-op on the 
client
+        // (set semantics), or (b) genuinely newer than the snapshot, in which 
case it
+        // correctly flows through as a Diff after the Snapshot frame.
+        resources.registerNamespaceListener(this);
+
+        return resources.findScalableTopicsByPropertiesAsync(namespace, 
propertyFilters)
+                .thenAccept(initialTopics -> {
+                    if (closed.get()) {
+                        return;
+                    }
+                    // Replace currentSet under sync to avoid races with 
onNotification.
+                    synchronized (currentSet) {
+                        currentSet.clear();
+                        currentSet.addAll(initialTopics);
+                    }
+                    snapshotEmitted.set(true);
+                    // Hash short-circuit: if the client tells us it already 
has this
+                    // exact set (reconnect within an unchanged window), don't 
waste
+                    // bytes on the wire. Future Diffs flow as usual.
+                    if (clientHash != null) {
+                        String serverHash = 
TopicList.calculateHash(initialTopics);
+                        if (clientHash.equals(serverHash)) {
+                            log.info().attr("topics", initialTopics.size())
+                                    .log("Reconnect hash matched; skipping 
snapshot");
+                            return;
+                        }
+                    }
+                    log.info().attr("topics", 
initialTopics.size()).log("Initial snapshot");
+                    cnx.ctx().writeAndFlush(
+                            Commands.newWatchScalableTopicsSnapshot(watchId, 
initialTopics));
+                });
+    }
+
+    /**
+     * Invoked by {@link ScalableTopicResources} for every metadata event 
whose path
+     * is a direct child of this watcher's namespace base path. The 
resources-level
+     * fan-out has already done the namespace + direct-child filtering, so we 
go
+     * straight to evaluating the filter and updating the matching set.
+     */
+    @Override
+    public void onNotification(Notification notification) {
+        if (closed.get()) {
+            return;
+        }
+        String path = notification.getPath();
+        // Resources-level fan-out guarantees direct-child paths under 
basePath, but
+        // re-derive the encoded local name defensively.
+        String rest = path.startsWith(basePath + "/")
+                ? path.substring(basePath.length() + 1) : path;
+
+        String topicName = TopicName.get("topic", namespace, 
Codec.decode(rest)).toString();
+
+        if (notification.getType() == NotificationType.Deleted) {
+            if (currentSet.remove(topicName)) {
+                enqueueRemoved(topicName);
+            }
+            return;
+        }
+
+        // Created or Modified — fetch the new value to evaluate the filter 
against.
+        TopicName tn = TopicName.get(topicName);
+        resources.getScalableTopicMetadataAsync(tn, true)
+                .whenComplete((optMd, ex) -> {
+                    if (closed.get()) {
+                        return;
+                    }
+                    if (ex != null) {
+                        log.warn().attr("topic", 
topicName).exceptionMessage(ex)
+                                .log("Failed to load scalable topic metadata 
for filter eval");
+                        return;
+                    }
+                    boolean wasInSet = currentSet.contains(topicName);
+                    boolean shouldBeInSet = optMd.isPresent() && 
matchesFilters(optMd.get());
+                    if (!wasInSet && shouldBeInSet) {
+                        currentSet.add(topicName);
+                        enqueueAdded(topicName);
+                    } else if (wasInSet && !shouldBeInSet) {
+                        currentSet.remove(topicName);
+                        enqueueRemoved(topicName);
+                    }
+                });
+    }
+
+    private boolean matchesFilters(ScalableTopicMetadata metadata) {
+        if (propertyFilters.isEmpty()) {
+            return true;
+        }
+        Map<String, String> p = metadata.getProperties();
+        if (p == null) {
+            return false;
+        }
+        for (var e : propertyFilters.entrySet()) {
+            if (!e.getValue().equals(p.get(e.getKey()))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void enqueueAdded(String topic) {
+        synchronized (coalesceLock) {
+            // Cancel out a pending remove (e.g. rapid remove-then-add of same 
topic).
+            pendingRemoved.remove(topic);
+            pendingAdded.add(topic);
+            scheduleFlush();
+        }
+    }
+
+    private void enqueueRemoved(String topic) {
+        synchronized (coalesceLock) {
+            pendingAdded.remove(topic);
+            pendingRemoved.add(topic);
+            scheduleFlush();
+        }
+    }
+
+    private void scheduleFlush() {
+        // Caller holds coalesceLock.
+        if (flushScheduled) {
+            return;
+        }
+        flushScheduled = true;
+        scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), 
TimeUnit.MILLISECONDS);
+    }
+
+    private void flushPending() {
+        if (closed.get()) {
+            return;
+        }
+        Set<String> added;
+        Set<String> removed;
+        synchronized (coalesceLock) {
+            added = pendingAdded.isEmpty() ? Set.of() : new 
LinkedHashSet<>(pendingAdded);
+            removed = pendingRemoved.isEmpty() ? Set.of() : new 
LinkedHashSet<>(pendingRemoved);
+            pendingAdded.clear();
+            pendingRemoved.clear();
+            flushScheduled = false;
+        }
+        if (added.isEmpty() && removed.isEmpty()) {
+            return;
+        }
+        // Wait until the initial snapshot was emitted: any deltas that fire 
before the
+        // snapshot is sent have already been folded into currentSet via 
onNotification's
+        // wasInSet check (same set we built the snapshot from), so they're 
correctly
+        // represented. We just need to send them AFTER the snapshot frame.
+        if (!snapshotEmitted.get()) {
+            // Re-defer: the snapshot future is short-lived, retry shortly.
+            scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(), 
TimeUnit.MILLISECONDS);
+            // Re-enqueue what we drained, since the next flush will rebuild 
from pending.
+            synchronized (coalesceLock) {
+                pendingAdded.addAll(added);
+                pendingRemoved.addAll(removed);
+                flushScheduled = true;
+            }
+            return;
+        }
+        log.info().attr("added", added.size()).attr("removed", 
removed.size()).log("Pushing diff");
+        cnx.ctx().writeAndFlush(Commands.newWatchScalableTopicsDiff(watchId, 
added, removed));
+    }
+
+    /**
+     * Drop the session. Deregister from the resources' namespace listener 
registry so
+     * the per-event fan-out skips us — no listener leak, no per-notification 
dispatch
+     * tax for the broker's lifetime.
+     */
+    public void close() {
+        if (!closed.compareAndSet(false, true)) {
+            return;
+        }
+        resources.deregisterNamespaceListener(this);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
new file mode 100644
index 00000000000..1e9d84897cd
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the reconnect-hash short-circuit in
+ * {@link ScalableTopicsWatcherSession}: when the client's reported hash 
matches
+ * the freshly-computed server hash, {@code start()} returns without writing a
+ * Snapshot frame; otherwise it emits Snapshot as usual.
+ */
+public class ScalableTopicsWatcherSessionHashTest {
+
+    private MetadataStoreExtended store;
+    private ScalableTopicResources resources;
+    private ServerCnx cnx;
+    private ChannelHandlerContext ctx;
+    private ScheduledExecutorService scheduler;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        store = new LocalMemoryMetadataStore("memory:local",
+                MetadataStoreConfig.builder().build());
+        resources = new ScalableTopicResources(store, 30);
+        cnx = mock(ServerCnx.class);
+        ctx = mock(ChannelHandlerContext.class);
+        when(cnx.ctx()).thenReturn(ctx);
+        // writeAndFlush returns a ChannelFuture; provide a no-op promise to 
keep the
+        // call chain happy. The ScalableTopicsWatcherSession ignores the 
return.
+        when(ctx.writeAndFlush(org.mockito.ArgumentMatchers.any()))
+                .thenReturn(new DefaultChannelPromise(new EmbeddedChannel(),
+                        ImmediateEventExecutor.INSTANCE));
+        scheduler = Executors.newSingleThreadScheduledExecutor();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws Exception {
+        if (scheduler != null) {
+            scheduler.shutdownNow();
+        }
+        if (store != null) {
+            store.close();
+        }
+    }
+
+    private ScalableTopicMetadata meta() {
+        return ScalableTopicMetadata.builder().epoch(0).nextSegmentId(1)
+                .properties(Map.of()).build();
+    }
+
+    @Test
+    public void noHashOnFirstSubscribeEmitsSnapshot() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-fresh-" + suffix());
+        TopicName tn = TopicName.get("topic://" + ns + "/t1");
+        resources.createScalableTopicAsync(tn, meta()).get();
+
+        var session = new ScalableTopicsWatcherSession(1L, ns, Map.of(),
+                /* clientHash= */ null, cnx, resources, scheduler);
+        session.start().get();
+
+        // First subscribe: no hash → broker must emit one Snapshot frame.
+        verify(ctx, 
atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+    }
+
+    @Test
+    public void matchingHashOnReconnectSkipsSnapshot() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-match-" + suffix());
+        TopicName tn = TopicName.get("topic://" + ns + "/t1");
+        resources.createScalableTopicAsync(tn, meta()).get();
+
+        // Client believes the namespace contains exactly this one topic. 
Compute the
+        // matching hash with the same function the broker uses 
(TopicList.crc32c).
+        String matchingHash = TopicList.calculateHash(List.of(tn.toString()));
+
+        var session = new ScalableTopicsWatcherSession(2L, ns, Map.of(),
+                /* clientHash= */ matchingHash, cnx, resources, scheduler);
+        session.start().get();
+
+        // Hash matched → no Snapshot frame written. Future Diffs would still 
flow
+        // through writeAndFlush, but start() itself must stay silent.
+        verify(ctx, 
never()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+    }
+
+    @Test
+    public void differingHashOnReconnectStillEmitsSnapshot() throws Exception {
+        NamespaceName ns = NamespaceName.get("tenant/ns-diff-" + suffix());
+        TopicName tn = TopicName.get("topic://" + ns + "/t1");
+        resources.createScalableTopicAsync(tn, meta()).get();
+
+        // Client thinks the set is something different. Broker must emit a 
fresh
+        // Snapshot so the client can reconcile.
+        String staleHash = TopicList.calculateHash(List.of("topic://" + ns + 
"/something-else"));
+
+        var session = new ScalableTopicsWatcherSession(3L, ns, Map.of(),
+                /* clientHash= */ staleHash, cnx, resources, scheduler);
+        session.start().get();
+
+        verify(ctx, 
atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+    }
+
+    private static String suffix() {
+        return UUID.randomUUID().toString().substring(0, 8);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
new file mode 100644
index 00000000000..ffdf0b0d277
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the broker-side namespace scalable-topics watcher.
+ *
+ * <p>Uses reflection to construct a {@code ScalableTopicsWatcher} 
(package-private
+ * in the v5 module) directly against the test cluster's v4 client. Verifies:
+ * <ul>
+ *   <li>Initial snapshot reflects pre-existing topics.</li>
+ *   <li>Topic create / delete fires {@code Diff} events with the right 
names.</li>
+ *   <li>Property filters narrow the matching set, and updating a topic's 
properties
+ *       to fall outside the filter emits {@code Removed}.</li>
+ * </ul>
+ */
+public class V5ScalableTopicsWatcherTest extends V5ClientBaseTest {
+
+    /** Captures every Snapshot/Diff so tests can assert on the cumulative 
state. */
+    private static final class CapturingListener {
+        final Set<String> currentSet = ConcurrentHashMap.newKeySet();
+        final List<List<String>> snapshots = new CopyOnWriteArrayList<>();
+        // (added, removed) per diff
+        final List<Map.Entry<List<String>, List<String>>> diffs = new 
CopyOnWriteArrayList<>();
+
+        void onSnapshot(List<String> topics) {
+            snapshots.add(List.copyOf(topics));
+            currentSet.clear();
+            currentSet.addAll(topics);
+        }
+
+        void onDiff(List<String> added, List<String> removed) {
+            diffs.add(Map.entry(List.copyOf(added), List.copyOf(removed)));
+            currentSet.removeAll(removed);
+            currentSet.addAll(added);
+        }
+    }
+
+    @Test
+    public void watcherEmitsCreateAndDeleteEvents() throws Exception {
+        NamespaceName ns = NamespaceName.get(getNamespace());
+
+        WatcherHandle handle = openWatcher(ns, Map.of());
+        try {
+            // Initial snapshot should be empty (or whatever pre-existing 
topics; the
+            // shared cluster gives us a fresh per-test namespace so it should 
be empty).
+            Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                    .until(() -> !handle.listener.snapshots.isEmpty());
+            assertEquals(handle.listener.snapshots.get(0).size(), 0,
+                    "fresh namespace should yield empty initial snapshot");
+
+            String topicA = ns + "/a-" + 
UUID.randomUUID().toString().substring(0, 8);
+            String topicB = ns + "/b-" + 
UUID.randomUUID().toString().substring(0, 8);
+            admin.scalableTopics().createScalableTopic("topic://" + topicA, 1);
+            admin.scalableTopics().createScalableTopic("topic://" + topicB, 1);
+
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                    assertEquals(handle.listener.currentSet, Set.of(
+                            "topic://" + topicA, "topic://" + topicB)));
+
+            admin.scalableTopics().deleteScalableTopic("topic://" + topicA, 
true);
+
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                    assertEquals(handle.listener.currentSet, Set.of("topic://" 
+ topicB)));
+        } finally {
+            handle.close();
+        }
+    }
+
+    @Test
+    public void watcherFiltersByProperty() throws Exception {
+        NamespaceName ns = NamespaceName.get(getNamespace());
+
+        WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice"));
+        try {
+            Awaitility.await().atMost(10, TimeUnit.SECONDS)
+                    .until(() -> !handle.listener.snapshots.isEmpty());
+
+            String aliceTopic = ns + "/alice-" + 
UUID.randomUUID().toString().substring(0, 8);
+            String bobTopic = ns + "/bob-" + 
UUID.randomUUID().toString().substring(0, 8);
+            admin.scalableTopics().createScalableTopic("topic://" + 
aliceTopic, 1,
+                    Map.of("owner", "alice"));
+            admin.scalableTopics().createScalableTopic("topic://" + bobTopic, 
1,
+                    Map.of("owner", "bob"));
+
+            // Only alice's topic should surface — bob is filtered out.
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                    assertEquals(handle.listener.currentSet, Set.of("topic://" 
+ aliceTopic)));
+
+            // bob's topic exists but never reaches the watcher's set.
+            assertTrue(!handle.listener.currentSet.contains("topic://" + 
bobTopic));
+        } finally {
+            handle.close();
+        }
+    }
+
+    @Test
+    public void watcherEmptyFilterSubscribesToEveryTopic() throws Exception {
+        NamespaceName ns = NamespaceName.get(getNamespace());
+
+        // Pre-create a topic before the watcher opens so the initial snapshot 
has work
+        // to do (proves the snapshot path, not just the live-event path).
+        String preTopic = ns + "/pre-" + 
UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic("topic://" + preTopic, 1);
+
+        WatcherHandle handle = openWatcher(ns, Map.of());
+        try {
+            Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+                    assertEquals(handle.listener.currentSet, Set.of("topic://" 
+ preTopic)));
+        } finally {
+            handle.close();
+        }
+    }
+
+    // --- Helpers ---
+
+    /** Bundle of watcher state needed to clean up at the end of a test. */
+    private static final class WatcherHandle implements AutoCloseable {
+        final Object watcher;
+        final CapturingListener listener;
+        final CountDownLatch ready;
+
+        WatcherHandle(Object watcher, CapturingListener listener, 
CountDownLatch ready) {
+            this.watcher = watcher;
+            this.listener = listener;
+            this.ready = ready;
+        }
+
+        @Override
+        public void close() throws Exception {
+            Method closeMethod = watcher.getClass().getDeclaredMethod("close");
+            closeMethod.setAccessible(true);
+            closeMethod.invoke(watcher);
+        }
+    }
+
+    /**
+     * Construct a {@code ScalableTopicsWatcher} via reflection (it's 
package-private
+     * inside the v5 module, but we drive it directly from the broker tests). 
Wire up
+     * a {@link CapturingListener}, call {@code start()}, and return a handle. 
The
+     * initial snapshot is delivered via the future + {@code onSnapshot} on the
+     * listener for uniform handling — see the watcher's javadoc.
+     */
+    private WatcherHandle openWatcher(NamespaceName ns, Map<String, String> 
filters)
+            throws Exception {
+        // Pull the underlying v4 client out of v5Client so we can construct 
the
+        // (package-private) watcher directly.
+        Field v4Field = v5Client.getClass().getDeclaredField("v4Client");
+        v4Field.setAccessible(true);
+        PulsarClientImpl v4 = (PulsarClientImpl) v4Field.get(v5Client);
+
+        Class<?> watcherCls = Class.forName(
+                "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher");
+        Constructor<?> ctor = watcherCls.getDeclaredConstructor(
+                PulsarClientImpl.class, NamespaceName.class, Map.class);
+        ctor.setAccessible(true);
+        Object watcher = ctor.newInstance(v4, ns, filters);
+
+        CapturingListener listener = new CapturingListener();
+
+        // The watcher's Listener is a package-private nested interface; use a 
Proxy.
+        Class<?> listenerCls = Class.forName(
+                
"org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher$Listener");
+        Object listenerProxy = java.lang.reflect.Proxy.newProxyInstance(
+                listenerCls.getClassLoader(), new Class<?>[]{listenerCls}, 
(proxy, method, args) -> {
+                    switch (method.getName()) {
+                        case "onSnapshot":
+                            listener.onSnapshot((List<String>) args[0]);
+                            break;
+                        case "onDiff":
+                            listener.onDiff((List<String>) args[0], 
(List<String>) args[1]);
+                            break;
+                        default:
+                            // Object methods (toString, hashCode, equals) — 
defaults are fine.
+                    }
+                    return null;
+                });
+
+        Method setListener = watcherCls.getDeclaredMethod("setListener", 
listenerCls);
+        setListener.setAccessible(true);
+        setListener.invoke(watcher, listenerProxy);
+
+        Method start = watcherCls.getDeclaredMethod("start");
+        start.setAccessible(true);
+        java.util.concurrent.CompletableFuture<List<String>> startFut =
+                (java.util.concurrent.CompletableFuture<List<String>>) 
start.invoke(watcher);
+
+        CountDownLatch ready = new CountDownLatch(1);
+        startFut.thenAccept(initialTopics -> {
+            // start()'s future delivers the initial snapshot — feed it into 
the listener
+            // for uniform handling with subsequent snapshots.
+            listener.onSnapshot(initialTopics);
+            ready.countDown();
+        });
+        if (!ready.await(10, TimeUnit.SECONDS)) {
+            throw new AssertionError("watcher did not deliver initial snapshot 
in 10s");
+        }
+        return new WatcherHandle(watcher, listener, ready);
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
new file mode 100644
index 00000000000..0350d83df52
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ScalableTopicsWatcherSession;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Backoff;
+
+/**
+ * Client-side manager for a namespace-wide scalable-topics watch session.
+ *
+ * <p>Opens a {@code CommandWatchScalableTopics} on attach, awaits the initial
+ * {@code Snapshot}, and forwards subsequent {@code Snapshot} / {@code Diff} 
events
+ * to a {@link Listener}. On connection drop, schedules a reconnect with 
backoff;
+ * when the new session lands, the broker pushes a fresh snapshot and the 
client
+ * applies it as a state replacement (no missed events).
+ *
+ * <p>Mirrors {@link DagWatchClient} in shape; the wire path is different — 
watch
+ * subscribe is fire-and-forget (no request/response), with the initial 
snapshot
+ * arriving as the first {@code WatchScalableTopicsUpdate}.
+ */
+final class ScalableTopicsWatcher implements ScalableTopicsWatcherSession, 
AutoCloseable {
+
+    private static final Logger LOG = Logger.get(ScalableTopicsWatcher.class);
+    private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0);
+
+    /**
+     * Listener for membership events. The watcher delivers events on the 
netty IO
+     * thread; implementations should not block.
+     */
+    interface Listener {
+        /** Full set; replace any local state. */
+        void onSnapshot(List<String> topics);
+
+        /** Apply removed before added (covers rapid remove-then-add of same 
name). */
+        void onDiff(List<String> added, List<String> removed);
+    }
+
+    private final Logger log;
+    private final PulsarClientImpl v4Client;
+    private final NamespaceName namespace;
+    private final Map<String, String> propertyFilters;
+    private final long watchId;
+    private final Backoff reconnectBackoff;
+
+    private final CompletableFuture<List<String>> initialSnapshotFuture = new 
CompletableFuture<>();
+    /**
+     * Mirrors the broker's view of the matching set so we can hand a hash on
+     * reconnect — when the set hasn't changed, the broker skips emitting a
+     * fresh snapshot. Updated on every Snapshot replace and Diff apply.
+     * Synchronised because Snapshot / Diff arrive on the netty thread but the
+     * hash may be read on a reconnect callback running elsewhere.
+     */
+    private final Set<String> currentSet = Collections.synchronizedSet(new 
HashSet<>());
+    private volatile Listener listener;
+    private volatile ClientCnx cnx;
+    private volatile boolean closed = false;
+
+    /**
+     * @param v4Client        the underlying v4 client used to open broker 
connections
+     * @param namespace       namespace to watch
+     * @param propertyFilters AND filters; empty map = match all
+     */
+    ScalableTopicsWatcher(PulsarClientImpl v4Client, NamespaceName namespace,
+                          Map<String, String> propertyFilters) {
+        this.v4Client = v4Client;
+        this.namespace = namespace;
+        this.propertyFilters = propertyFilters == null ? Map.of() : 
propertyFilters;
+        this.watchId = WATCH_ID_GENERATOR.incrementAndGet();
+        this.reconnectBackoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofSeconds(30))
+                .build();
+        this.log = LOG.with()
+                .attr("namespace", namespace)
+                .attr("watchId", watchId)
+                .attr("filters", this.propertyFilters)
+                .build();
+    }
+
+    /**
+     * Open the watch session on a connection to the configured service URL.
+     * Resolves with the initial snapshot's topic list. After this returns, 
every
+     * subsequent {@code Snapshot} / {@code Diff} flows through {@link 
#setListener}.
+     */
+    CompletableFuture<List<String>> start() {
+        v4Client.getConnectionToServiceUrl()
+                .thenAccept(this::attach)
+                .exceptionally(ex -> {
+                    initialSnapshotFuture.completeExceptionally(ex);
+                    return null;
+                });
+        return initialSnapshotFuture;
+    }
+
+    private void attach(ClientCnx newCnx) {
+        if (closed) {
+            return;
+        }
+        this.cnx = newCnx;
+        if (!newCnx.isSupportsScalableTopics()) {
+            initialSnapshotFuture.completeExceptionally(
+                    new PulsarClientException.FeatureNotSupportedException(
+                            "Broker does not support scalable topics",
+                            
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+            return;
+        }
+        newCnx.registerScalableTopicsWatcher(watchId, this);
+        // First subscribe: send no hash so the broker emits the initial 
snapshot
+        // unconditionally. snapshot is what populates initialSnapshotFuture.
+        newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics(
+                        watchId, namespace.toString(), propertyFilters,
+                        /* currentHash= */ null))
+                .addListener(writeFuture -> {
+                    if (!writeFuture.isSuccess()) {
+                        newCnx.removeScalableTopicsWatcher(watchId);
+                        if (!initialSnapshotFuture.isDone()) {
+                            initialSnapshotFuture.completeExceptionally(
+                                    new 
PulsarClientException(writeFuture.cause()));
+                        }
+                    }
+                });
+    }
+
+    @Override
+    public void onSnapshot(List<String> topics) {
+        if (closed) {
+            return;
+        }
+        log.info().attr("topics", topics.size()).log("Snapshot received");
+        // Reset backoff on every successful snapshot — that's the broker 
confirming
+        // the session is live and our local state is consistent.
+        reconnectBackoff.reset();
+        // Replace local set so the next reconnect computes the right hash.
+        synchronized (currentSet) {
+            currentSet.clear();
+            currentSet.addAll(topics);
+        }
+        if (!initialSnapshotFuture.isDone()) {
+            initialSnapshotFuture.complete(topics);
+            // The listener is set by the caller AFTER start() resolves, so 
the initial
+            // snapshot is delivered via the future, not via onSnapshot's 
fan-out.
+            return;
+        }
+        Listener l = listener;
+        if (l != null) {
+            try {
+                l.onSnapshot(topics);
+            } catch (Exception e) {
+                log.error().exception(e).log("Listener threw on snapshot");
+            }
+        }
+    }
+
+    @Override
+    public void onDiff(List<String> added, List<String> removed) {
+        if (closed) {
+            return;
+        }
+        log.info().attr("added", added.size()).attr("removed", removed.size())
+                .log("Diff received");
+        // Apply removed before added — covers rapid remove-then-add of the 
same name.
+        synchronized (currentSet) {
+            currentSet.removeAll(removed);
+            currentSet.addAll(added);
+        }
+        Listener l = listener;
+        if (l != null) {
+            try {
+                l.onDiff(added, removed);
+            } catch (Exception e) {
+                log.error().exception(e).log("Listener threw on diff");
+            }
+        }
+    }
+
+    /**
+     * Snapshot the current set under lock so the hash + the watch frame agree 
on
+     * the same view. CRC32C of sorted topic names — same function used by
+     * {@code CommandGetTopicsOfNamespace} so behaviour matches the existing
+     * topic-list watch on the wire.
+     */
+    private String currentSetHash() {
+        synchronized (currentSet) {
+            if (currentSet.isEmpty()) {
+                return TopicList.calculateHash(java.util.List.of());
+            }
+            return TopicList.calculateHash(new HashSet<>(currentSet));
+        }
+    }
+
+    @Override
+    public void onError(ServerError error, String message) {
+        log.warn().attr("error", error).attr("message", message)
+                .log("WatchScalableTopics rejected");
+        if (!initialSnapshotFuture.isDone()) {
+            initialSnapshotFuture.completeExceptionally(
+                    new PulsarClientException("WatchScalableTopics failed: " + 
error
+                            + (message != null ? " - " + message : "")));
+        }
+    }
+
+    @Override
+    public void connectionClosed() {
+        log.warn("Scalable-topics watcher connection closed");
+        cnx = null;
+        if (closed) {
+            return;
+        }
+        if (!initialSnapshotFuture.isDone()) {
+            // Initial subscribe never completed — surface the failure rather 
than
+            // retrying silently behind the caller.
+            initialSnapshotFuture.completeExceptionally(
+                    new PulsarClientException(
+                            "Connection closed before initial scalable-topics 
snapshot arrived"));
+            return;
+        }
+        scheduleReconnect();
+    }
+
+    private void scheduleReconnect() {
+        if (closed) {
+            return;
+        }
+        long delayMs = reconnectBackoff.next().toMillis();
+        log.info().attr("delayMs", delayMs).log("Scheduling watcher 
reconnect");
+        v4Client.timer().newTimeout(timeout -> reconnect(),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void reconnect() {
+        if (closed) {
+            return;
+        }
+        v4Client.getConnectionToServiceUrl()
+                .thenAccept(newCnx -> {
+                    if (closed) {
+                        return;
+                    }
+                    if (!newCnx.isSupportsScalableTopics()) {
+                        log.warn().log("Watcher reconnect: broker doesn't 
support scalable topics");
+                        scheduleReconnect();
+                        return;
+                    }
+                    this.cnx = newCnx;
+                    newCnx.registerScalableTopicsWatcher(watchId, this);
+                    // Reconnect: send the hash of our current set. If the 
broker's
+                    // freshly-computed hash matches, it skips emitting a 
Snapshot —
+                    // the watch is live and our local state is correct. Future
+                    // Diffs flow as usual; if the hash differs the broker 
sends a
+                    // Snapshot which we apply as a full-state replace.
+                    String hash = currentSetHash();
+                    newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics(
+                                    watchId, namespace.toString(),
+                                    propertyFilters, hash))
+                            .addListener(writeFuture -> {
+                                if (!writeFuture.isSuccess()) {
+                                    
newCnx.removeScalableTopicsWatcher(watchId);
+                                    
log.warn().exceptionMessage(writeFuture.cause())
+                                            .log("Watcher reconnect write 
failed");
+                                    scheduleReconnect();
+                                    return;
+                                }
+                                // Write reached the broker — connection is 
healthy.
+                                // Reset the backoff so the next disconnect 
starts
+                                // fresh. Crucial for the hash-skip path: the 
broker
+                                // emits no Snapshot, so onSnapshot's reset 
never
+                                // fires; without this, a chain of short blips 
keeps
+                                // the backoff at its peak forever.
+                                reconnectBackoff.reset();
+                            });
+                })
+                .exceptionally(ex -> {
+                    log.warn().exceptionMessage(ex).log("Watcher reconnect 
failed");
+                    scheduleReconnect();
+                    return null;
+                });
+    }
+
+    /**
+     * Set the listener that receives {@code Snapshot} / {@code Diff} events. 
Should
+     * be called after {@link #start()} resolves — the initial snapshot is 
delivered
+     * via that future, not via the listener.
+     */
+    void setListener(Listener listener) {
+        this.listener = listener;
+    }
+
+    /**
+     * Visible for testing — snapshot of the current set. In production, the
+     * listener is the source of truth; this method exists so tests can poke 
the
+     * watcher's hash-tracking state directly.
+     */
+    Set<String> currentSetForTesting() {
+        synchronized (currentSet) {
+            return new HashSet<>(currentSet);
+        }
+    }
+
+    long watchId() {
+        return watchId;
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        ClientCnx c = cnx;
+        if (c != null) {
+            c.removeScalableTopicsWatcher(watchId);
+            
c.ctx().writeAndFlush(Commands.newWatchScalableTopicsClose(watchId));
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f45fc2c91f4..40c4e12aa05 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -178,6 +178,18 @@ public class ClientCnx extends PulsarHandler {
                     .concurrencyLevel(1)
                     .build();
 
+    /**
+     * Per-watcher namespace scalable-topics watch sessions, keyed by the
+     * {@code watchId} chosen by the client. The broker tags every
+     * {@link 
org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate} with
+     * this id.
+     */
+    private final ConcurrentLongHashMap<ScalableTopicsWatcherSession> 
scalableTopicsWatchers =
+            ConcurrentLongHashMap.<ScalableTopicsWatcherSession>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(1)
+                    .build();
+
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
 
@@ -375,6 +387,7 @@ public class ClientCnx extends PulsarHandler {
         topicListWatchers.forEach((__, watcher) -> 
watcher.connectionClosed(this));
         dagWatchSessions.forEach((__, session) -> session.connectionClosed());
         scalableConsumerSessions.forEach((__, session) -> 
session.connectionClosed());
+        scalableTopicsWatchers.forEach((__, session) -> 
session.connectionClosed());
 
         waitingLookupRequests.clear();
 
@@ -383,6 +396,7 @@ public class ClientCnx extends PulsarHandler {
         topicListWatchers.clear();
         dagWatchSessions.clear();
         scalableConsumerSessions.clear();
+        scalableTopicsWatchers.clear();
 
         timeoutTask.cancel(true);
     }
@@ -1427,6 +1441,75 @@ public class ClientCnx extends PulsarHandler {
         scalableConsumerSessions.remove(consumerId);
     }
 
+    @Override
+    protected void handleCommandWatchScalableTopicsUpdate(
+            
org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate cmd) {
+        checkArgument(state == State.Ready);
+
+        long watchId = cmd.getWatchId();
+        log.debug().attr("watchId", watchId).log("Received 
WatchScalableTopicsUpdate");
+
+        if (cmd.hasError()) {
+            // Error response — terminal for this watch (subscribe was 
rejected or the
+            // server failed to compute the initial set). Drop the local 
registration so
+            // a retry from the caller starts fresh.
+            ScalableTopicsWatcherSession session = 
scalableTopicsWatchers.remove(watchId);
+            if (session != null) {
+                session.onError(cmd.getError(), cmd.hasMessage() ? 
cmd.getMessage() : null);
+            } else {
+                log.warn().attr("watchId", watchId)
+                        .log("Received scalable-topics watch error for unknown 
watcher");
+            }
+            return;
+        }
+
+        ScalableTopicsWatcherSession session = 
scalableTopicsWatchers.get(watchId);
+        if (session == null) {
+            log.warn().attr("watchId", watchId)
+                    .log("Received scalable-topics watch update for unknown 
watcher");
+            return;
+        }
+        // Snapshot and diff are mutually exclusive via the proto oneof; 
switch on the
+        // generated event-case enum and unpack accordingly.
+        switch (cmd.getEventCase()) {
+            case SNAPSHOT -> {
+                var snapshot = cmd.getSnapshot();
+                java.util.List<String> topics = new 
java.util.ArrayList<>(snapshot.getTopicsCount());
+                for (int i = 0; i < snapshot.getTopicsCount(); i++) {
+                    topics.add(snapshot.getTopicAt(i));
+                }
+                session.onSnapshot(topics);
+            }
+            case DIFF -> {
+                var diff = cmd.getDiff();
+                // LightProto pluralises count accessors with a trailing 's' 
on the field name,
+                // hence `getAddedsCount` / `getRemovedsCount`.
+                java.util.List<String> added = new 
java.util.ArrayList<>(diff.getAddedsCount());
+                for (int i = 0; i < diff.getAddedsCount(); i++) {
+                    added.add(diff.getAddedAt(i));
+                }
+                java.util.List<String> removed = new 
java.util.ArrayList<>(diff.getRemovedsCount());
+                for (int i = 0; i < diff.getRemovedsCount(); i++) {
+                    removed.add(diff.getRemovedAt(i));
+                }
+                session.onDiff(added, removed);
+            }
+            case NOT_SET -> log.warn().attr("watchId", watchId)
+                    .log("Received scalable-topics watch update with no event 
payload");
+            default -> log.warn().attr("watchId", watchId)
+                    .attr("case", cmd.getEventCase())
+                    .log("Received scalable-topics watch update with unknown 
event case");
+        }
+    }
+
+    public void registerScalableTopicsWatcher(long watchId, 
ScalableTopicsWatcherSession watcher) {
+        scalableTopicsWatchers.put(watchId, watcher);
+    }
+
+    public void removeScalableTopicsWatcher(long watchId) {
+        scalableTopicsWatchers.remove(watchId);
+    }
+
     /**
      * check serverError and take appropriate action.
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
new file mode 100644
index 00000000000..7b71dc208ec
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.List;
+import org.apache.pulsar.common.api.proto.ServerError;
+
+/**
+ * Client-side callback for a namespace-wide scalable-topics watch session.
+ *
+ * <p>The broker pushes either a full {@code Snapshot} (initial subscribe and 
on every
+ * reconnect resync) or an incremental {@code Diff}. Implementations apply the
+ * snapshot as a full state replacement and the diff as a set delta — both are
+ * idempotent and self-healing across reconnects.
+ *
+ * <p>Implemented by the V5 client's {@code ScalableTopicsWatcher}.
+ */
+public interface ScalableTopicsWatcherSession {
+
+    /**
+     * Full set of topics currently matching the watch's filters. The 
implementation
+     * should replace any local state derived from prior events with this 
snapshot.
+     */
+    void onSnapshot(List<String> topics);
+
+    /**
+     * Incremental membership change. Apply {@code removed} before {@code 
added}
+     * (covers a rapid remove-then-add of the same topic name within a 
coalescing
+     * window).
+     */
+    void onDiff(List<String> added, List<String> removed);
+
+    /**
+     * The broker rejected the watch (e.g. authz, broker shutting down).
+     * Implementations should fail any pending start future and stop emitting 
events.
+     */
+    void onError(ServerError error, String message);
+
+    /**
+     * The underlying connection dropped. Implementations should treat any 
local set
+     * as stale until the next snapshot arrives.
+     */
+    void connectionClosed();
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 02e03a491d0..2c0c74fb378 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1774,6 +1774,95 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+    /**
+     * Client -> Broker: open a scalable-topics watch session.
+     *
+     * @param watchId         client-assigned watch identifier
+     * @param namespace       tenant/namespace to scope the watch to
+     * @param consumerName    optional caller identity (carried for a future 
namespace
+     *                        coordinator); pass {@code null} if not yet 
assigned
+     * @param propertyFilters AND filters; empty / null = match all topics in 
the namespace
+     */
+    /**
+     * @param currentHash optional hash of the client's currently-known topic 
set.
+     *                    Pass on reconnect to let the broker skip the 
snapshot when
+     *                    state hasn't changed; pass {@code null} on first 
subscribe.
+     */
+    public static ByteBuf newWatchScalableTopics(long watchId, String 
namespace,
+                                                  java.util.Map<String, 
String> propertyFilters,
+                                                  String currentHash) {
+        BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS);
+        org.apache.pulsar.common.api.proto.CommandWatchScalableTopics watch =
+                cmd.setWatchScalableTopics()
+                        .setWatchId(watchId)
+                        .setNamespace(namespace);
+        if (propertyFilters != null) {
+            for (var entry : propertyFilters.entrySet()) {
+                watch.addPropertyFilter()
+                        .setKey(entry.getKey())
+                        .setValue(entry.getValue());
+            }
+        }
+        if (currentHash != null) {
+            watch.setCurrentHash(currentHash);
+        }
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newWatchScalableTopicsClose(long watchId) {
+        BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS_CLOSE);
+        cmd.setWatchScalableTopicsClose().setWatchId(watchId);
+        return serializeWithSize(cmd);
+    }
+
+    /**
+     * Broker -> Client: emit a full snapshot of the matching topic set. Sent 
on initial
+     * subscribe and on every reconnect-resync; the client replaces its local 
state.
+     */
+    public static ByteBuf newWatchScalableTopicsSnapshot(long watchId,
+                                                          
java.util.Collection<String> topics) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+        var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId);
+        var snapshot = update.setSnapshot();
+        for (String t : topics) {
+            snapshot.addTopic(t);
+        }
+        return serializeWithSize(cmd);
+    }
+
+    /**
+     * Broker -> Client: emit an incremental membership change. Either {@code 
added} or
+     * {@code removed} (or both) may be empty. Apply removed before added when 
both
+     * appear together.
+     */
+    public static ByteBuf newWatchScalableTopicsDiff(long watchId,
+                                                     
java.util.Collection<String> added,
+                                                     
java.util.Collection<String> removed) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+        var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId);
+        var diff = update.setDiff();
+        if (added != null) {
+            for (String t : added) {
+                diff.addAdded(t);
+            }
+        }
+        if (removed != null) {
+            for (String t : removed) {
+                diff.addRemoved(t);
+            }
+        }
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newWatchScalableTopicsError(long watchId, 
ServerError error, String message) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+        cmd.setWatchScalableTopicsUpdate()
+                .setWatchId(watchId)
+                .setError(error)
+                .setMessage(message);
+        return serializeWithSize(cmd);
+    }
+
     public static ByteBuf serializeWithSize(BaseCommand cmd) {
         return serializeWithPrecalculatedSerializedSize(cmd, 
cmd.getSerializedSize());
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 0eb2b1b9670..bca87683f27 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -514,6 +514,21 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 
handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate());
                 break;
 
+            case WATCH_SCALABLE_TOPICS:
+                checkArgument(cmd.hasWatchScalableTopics());
+                handleCommandWatchScalableTopics(cmd.getWatchScalableTopics());
+                break;
+
+            case WATCH_SCALABLE_TOPICS_UPDATE:
+                checkArgument(cmd.hasWatchScalableTopicsUpdate());
+                
handleCommandWatchScalableTopicsUpdate(cmd.getWatchScalableTopicsUpdate());
+                break;
+
+            case WATCH_SCALABLE_TOPICS_CLOSE:
+                checkArgument(cmd.hasWatchScalableTopicsClose());
+                
handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose());
+                break;
+
             default:
                 break;
             }
@@ -807,6 +822,23 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleCommandWatchScalableTopics(
+            org.apache.pulsar.common.api.proto.CommandWatchScalableTopics 
commandWatchScalableTopics) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchScalableTopicsUpdate(
+            org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate
+                    commandWatchScalableTopicsUpdate) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchScalableTopicsClose(
+            org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose
+                    commandWatchScalableTopicsClose) {
+        throw new UnsupportedOperationException();
+    }
+
     private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
         NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 20bd8ab8185..6d135297317 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -940,6 +940,57 @@ message CommandScalableTopicAssignmentUpdate {
     required ScalableConsumerAssignment assignment = 2;
 }
 
+// Multi-topic consumer watcher: subscribes to the union of scalable topics in 
a
+// namespace that match a (possibly empty) set of property filters. The broker 
keeps
+// pushing updates as topics enter or leave the matching set. See
+// `multi-topic-consumer-design.md` for the full design.
+
+// Client -> Broker: open a watch session.
+message CommandWatchScalableTopics {
+    required uint64 watch_id          = 1;   // Client-assigned watch ID
+    required string namespace         = 2;   // tenant/namespace
+    // Optional AND filters; empty list means "match all topics in the 
namespace".
+    repeated KeyValue property_filters = 3;
+    // Hash of the topics the client believes are currently in its set. Sent on
+    // reconnect; absent on first subscribe. If it matches the broker's freshly
+    // computed hash, the broker skips emitting the initial Snapshot — the 
client's
+    // local state is already correct and future Diffs will flow as usual. Same
+    // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted 
topics).
+    optional string current_hash       = 4;
+}
+
+// Snapshot of the full matching set. Sent on initial subscribe and on every
+// reconnect-resync. The client replaces its local set with this list.
+message ScalableTopicsSnapshot {
+    repeated string topics = 1;
+}
+
+// Incremental membership change. Apply removed before added when both are 
present.
+message ScalableTopicsDiff {
+    repeated string added   = 1;
+    repeated string removed = 2;
+}
+
+// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof). 
When the
+// initial subscribe fails, neither variant is set and `error`/`message` carry 
the
+// failure reason.
+message CommandWatchScalableTopicsUpdate {
+    required uint64 watch_id              = 1;
+
+    oneof event {
+        ScalableTopicsSnapshot snapshot = 2;
+        ScalableTopicsDiff diff         = 3;
+    }
+
+    optional ServerError error            = 4;
+    optional string message               = 5;
+}
+
+// Client -> Broker: close the watch session.
+message CommandWatchScalableTopicsClose {
+    required uint64 watch_id = 1;
+}
+
 message CommandGetSchema {
     required uint64 request_id = 1;
     required string topic      = 2;
@@ -1185,6 +1236,10 @@ message BaseCommand {
         SCALABLE_TOPIC_SUBSCRIBE              = 73;
         SCALABLE_TOPIC_SUBSCRIBE_RESPONSE     = 74;
         SCALABLE_TOPIC_ASSIGNMENT_UPDATE      = 75;
+
+        WATCH_SCALABLE_TOPICS                 = 76;
+        WATCH_SCALABLE_TOPICS_UPDATE          = 77;
+        WATCH_SCALABLE_TOPICS_CLOSE           = 78;
     }
 
 
@@ -1276,4 +1331,8 @@ message BaseCommand {
     optional CommandScalableTopicSubscribe scalableTopicSubscribe              
       = 73;
     optional CommandScalableTopicSubscribeResponse 
scalableTopicSubscribeResponse     = 74;
     optional CommandScalableTopicAssignmentUpdate 
scalableTopicAssignmentUpdate       = 75;
+
+    optional CommandWatchScalableTopics watchScalableTopics                    
       = 76;
+    optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate        
       = 77;
+    optional CommandWatchScalableTopicsClose watchScalableTopicsClose          
       = 78;
 }

Reply via email to