This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d0e557f15da [feat] PIP-468: Namespace scalable-topics watcher
(protocol + broker + V5 client) (#25648)
d0e557f15da is described below
commit d0e557f15da844b369b876dc0f5c98e4586f3164
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 1 15:18:20 2026 -0700
[feat] PIP-468: Namespace scalable-topics watcher (protocol + broker + V5
client) (#25648)
---
.../broker/resources/ScalableTopicResources.java | 100 ++++++
.../apache/pulsar/broker/service/ServerCnx.java | 118 +++++++
.../scalable/ScalableTopicsWatcherSession.java | 315 +++++++++++++++++++
.../ScalableTopicsWatcherSessionHashTest.java | 147 +++++++++
.../client/api/v5/V5ScalableTopicsWatcherTest.java | 234 ++++++++++++++
.../client/impl/v5/ScalableTopicsWatcher.java | 348 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 83 +++++
.../client/impl/ScalableTopicsWatcherSession.java | 60 ++++
.../apache/pulsar/common/protocol/Commands.java | 89 ++++++
.../pulsar/common/protocol/PulsarDecoder.java | 32 ++
pulsar-common/src/main/proto/PulsarApi.proto | 59 ++++
11 files changed, 1585 insertions(+)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
index 3e015808476..550212a4a19 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ScalableTopicResources.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.CustomLog;
@@ -35,6 +36,8 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
/**
* Metadata store access for scalable topic metadata.
@@ -71,10 +74,98 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
private final MetadataCache<SubscriptionMetadata> subscriptionCache;
private final MetadataCache<ConsumerRegistration>
consumerRegistrationCache;
+ /**
+ * Per-namespace listeners for scalable topic create / modify / delete
events.
+ * Keyed by listener so each subscriber can deregister cleanly on close.
The map
+ * is consulted from the single store-level listener registered at
construction
+ * time, eliminating the listener leak that would otherwise occur every
time a
+ * watcher session ends (the metadata store API has no
+ * {@code unregisterListener}). Mirrors the {@link TopicResources}
+ * pattern for {@code TopicListener}.
+ */
+ private final Map<NamespaceListener, NamespaceName> namespaceListeners =
+ new ConcurrentHashMap<>();
+
public ScalableTopicResources(MetadataStore store, int
operationTimeoutSec) {
super(store, ScalableTopicMetadata.class, operationTimeoutSec);
this.subscriptionCache =
store.getMetadataCache(SubscriptionMetadata.class);
this.consumerRegistrationCache =
store.getMetadataCache(ConsumerRegistration.class);
+ // Single shared metadata-store listener fans out to every registered
watcher.
+ // Per-watcher registration happens via registerNamespaceListener;
close() calls
+ // deregisterNamespaceListener so closed watchers are not on the
dispatch list.
+ if (store instanceof MetadataStoreExtended ext) {
+ ext.registerListener(this::handleNotification);
+ } else {
+ store.registerListener(this::handleNotification);
+ }
+ }
+
+ // --- Namespace-level scalable-topics listeners ---
+
+ /**
+ * Listener for scalable-topic create / modify / delete events under a
single
+ * namespace. The fan-out in {@link ScalableTopicResources} filters
notifications
+ * to the listener's namespace and to direct topic records (skipping
subtree paths
+ * like {@code <topic>/subscriptions/...} or {@code <topic>/controller}).
+ */
+ public interface NamespaceListener {
+ /** Namespace this listener is scoped to. */
+ NamespaceName getNamespaceName();
+
+ /** Called for every metadata event affecting a topic record in the
namespace. */
+ void onNotification(Notification notification);
+ }
+
+ /**
+ * Register a per-namespace listener. The listener will receive every
+ * Created / Modified / Deleted event whose path is a direct child of
+ * {@code /topics/<tenant>/<ns>}. Idempotent — re-registering the same
listener
+ * just updates the namespace mapping.
+ */
+ public void registerNamespaceListener(NamespaceListener listener) {
+ namespaceListeners.put(listener, listener.getNamespaceName());
+ }
+
+ /**
+ * Deregister a previously-registered namespace listener. Safe to call
multiple
+ * times or for listeners that were never registered.
+ */
+ public void deregisterNamespaceListener(NamespaceListener listener) {
+ namespaceListeners.remove(listener);
+ }
+
+ /**
+ * Single fan-out path: for each registered listener, emit the
notification iff
+ * its path is a direct child of the listener's namespace base path.
Filters out
+ * subtree events (subscriptions, controller lock) up front.
+ */
+ void handleNotification(Notification notification) {
+ if (namespaceListeners.isEmpty()) {
+ return;
+ }
+ String path = notification.getPath();
+ if (!path.startsWith(SCALABLE_TOPIC_PATH + "/")) {
+ return;
+ }
+ for (Map.Entry<NamespaceListener, NamespaceName> entry :
namespaceListeners.entrySet()) {
+ String basePath = namespacePath(entry.getValue());
+ if (!path.startsWith(basePath + "/")) {
+ continue;
+ }
+ // Direct child only — strip the prefix and check there's no
further '/'.
+ String rest = path.substring(basePath.length() + 1);
+ if (rest.indexOf('/') >= 0) {
+ continue;
+ }
+ try {
+ entry.getKey().onNotification(notification);
+ } catch (Exception e) {
+ log.warn().attr("listener", entry.getKey())
+ .attr("path", path)
+ .exceptionMessage(e)
+ .log("Failed to dispatch scalable-topic notification");
+ }
+ }
}
public CompletableFuture<Void> createScalableTopicAsync(TopicName tn,
ScalableTopicMetadata metadata) {
@@ -279,6 +370,15 @@ public class ScalableTopicResources extends
BaseResources<ScalableTopicMetadata>
return joinPath(SCALABLE_TOPIC_PATH, tn.getNamespace(),
tn.getEncodedLocalName());
}
+ /**
+ * Path under which all scalable topic records for a namespace live as
direct
+ * children. Used by namespace-wide watchers as the prefix to filter
metadata
+ * notifications down to events that touch a topic record.
+ */
+ public String namespacePath(NamespaceName ns) {
+ return joinPath(SCALABLE_TOPIC_PATH, ns.toString());
+ }
+
public String subscriptionPath(TopicName tn, String subscription) {
return joinPath(topicPath(tn), SUBSCRIPTIONS_SEGMENT, subscription);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2ed913561b2..109cf1b489b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -486,6 +486,17 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
dagWatchSessions.clear();
+ // Same for namespace-wide scalable-topic watchers.
+ scalableTopicsWatchers.values().forEach(session -> {
+ try {
+ session.close();
+ } catch (Exception e) {
+ log.warn().exceptionMessage(e)
+ .log("Error closing scalable-topics watcher on
connection close");
+ }
+ });
+ scalableTopicsWatchers.clear();
+
// Notify the scalable-topic controller that this connection's
scalable consumers
// have dropped. The controller marks them disconnected and starts the
grace-period
// timer; if they reconnect in time, their assignment is preserved.
@@ -832,6 +843,113 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
});
}
+ // --- Scalable topics namespace watcher ---
+
+ private final java.util.concurrent.ConcurrentHashMap<Long,
+
org.apache.pulsar.broker.service.scalable.ScalableTopicsWatcherSession>
+ scalableTopicsWatchers = new
java.util.concurrent.ConcurrentHashMap<>();
+
+ @Override
+ protected void handleCommandWatchScalableTopics(
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopics cmd)
{
+ checkArgument(state == State.Connected);
+
+ final long watchId = cmd.getWatchId();
+ final String namespaceStr = cmd.getNamespace();
+ log.debug().attr("namespace", namespaceStr).attr("watchId", watchId)
+ .log("Received WatchScalableTopics");
+
+ if (!scalableTopicsEnabled) {
+ ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.NotAllowedError, "Scalable topics are disabled
on this broker"));
+ return;
+ }
+
+ final NamespaceName namespaceName;
+ try {
+ namespaceName = NamespaceName.get(namespaceStr);
+ } catch (Exception e) {
+ log.warn().attr("namespace", namespaceStr).log("Invalid namespace
in WatchScalableTopics");
+ ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.InvalidTopicName, "Invalid namespace: " +
namespaceStr));
+ return;
+ }
+
+ final java.util.Map<String, String> propertyFilters = new
java.util.HashMap<>();
+ for (int i = 0; i < cmd.getPropertyFiltersCount(); i++) {
+ var kv = cmd.getPropertyFilterAt(i);
+ propertyFilters.put(kv.getKey(), kv.getValue());
+ }
+ final String clientHash = cmd.hasCurrentHash() ? cmd.getCurrentHash()
: null;
+
+ if (!this.service.getPulsar().isRunning()) {
+ log.warn("WatchScalableTopics rejected: broker not ready");
+ ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.ServiceNotReady, "Broker not ready"));
+ return;
+ }
+
+ org.apache.pulsar.broker.resources.ScalableTopicResources resources =
+
service.getPulsar().getPulsarResources().getScalableTopicResources();
+ if (resources == null) {
+ log.warn("WatchScalableTopics rejected: scalable topic resources
not available");
+ ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.ServiceNotReady, "Scalable topic resources not
available"));
+ return;
+ }
+
+ isNamespaceOperationAllowed(namespaceName,
NamespaceOperation.GET_TOPICS)
+ .thenAccept(isAuthorized -> {
+ if (!isAuthorized) {
+ final String msg = "Client is not authorized to
WatchScalableTopics";
+ log.warn().attr("principal",
getPrincipal()).attr("namespace", namespaceName)
+ .log(msg);
+
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.AuthorizationError, msg));
+ return;
+ }
+ var session = new org.apache.pulsar.broker.service.scalable
+ .ScalableTopicsWatcherSession(watchId,
namespaceName, propertyFilters,
+ clientHash, this, resources,
service.getPulsar().getExecutor());
+ scalableTopicsWatchers.put(watchId, session);
+
+ session.start().exceptionally(ex -> {
+ Throwable cause = ex.getCause() != null ?
ex.getCause() : ex;
+ log.warn().attr("namespace",
namespaceName).exception(cause)
+ .log("WatchScalableTopics failed");
+ scalableTopicsWatchers.remove(watchId);
+ session.close();
+ ctx.executor().execute(() -> ctx.writeAndFlush(
+ Commands.newWatchScalableTopicsError(watchId,
+ ServerError.UnknownError,
cause.getMessage())));
+ return null;
+ });
+ })
+ .exceptionally(ex -> {
+ logNamespaceNameAuthException(remoteAddress,
"watch-scalable-topics",
+ getPrincipal(), Optional.of(namespaceName), ex);
+
ctx.writeAndFlush(Commands.newWatchScalableTopicsError(watchId,
+ ServerError.AuthorizationError,
+ "Exception occurred while authorizing
WatchScalableTopics"));
+ return null;
+ });
+ }
+
+ @Override
+ protected void handleCommandWatchScalableTopicsClose(
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose
cmd) {
+ // Same idempotent-close semantics as DAG watch / consumer close:
per-cnx
+ // session, originating subscribe was authorized at create time, no
per-call
+ // authz needed. Unknown watchId is a no-op.
+ checkArgument(state == State.Connected);
+ long watchId = cmd.getWatchId();
+ log.debug().attr("watchId", watchId).log("Received
WatchScalableTopicsClose");
+ var session = scalableTopicsWatchers.remove(watchId);
+ if (session != null) {
+ session.close();
+ }
+ }
+
@Override
protected void handleCommandScalableTopicClose(
CommandScalableTopicClose commandScalableTopicClose) {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
new file mode 100644
index 00000000000..8598170d969
--- /dev/null
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSession.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+
+/**
+ * Broker-side handler for a multi-topic consumer's namespace watch session.
+ *
+ * <p>Watches the metadata store for scalable-topic create / modify / delete
events
+ * under a given namespace, evaluates them against a (possibly empty) set of
property
+ * filters, and pushes the matching set to the client. The client receives:
+ * <ul>
+ * <li>One {@code Snapshot} on subscribe (and on every
reconnect-resync).</li>
+ * <li>One {@code Diff} per coalescing window when membership changes.</li>
+ * </ul>
+ *
+ * <p>Tied to a connection — drop the session when the channel goes away. The
client
+ * re-opens a fresh watch on reconnect; the new session emits a fresh snapshot
and
+ * the client reconciles locally.
+ *
+ * <p>Any broker can serve this role: every broker observes the same metadata
events
+ * via the registered listener, so no coordinator is needed at the namespace
level.
+ */
+public class ScalableTopicsWatcherSession implements
ScalableTopicResources.NamespaceListener {
+
+ private static final Logger LOG =
Logger.get(ScalableTopicsWatcherSession.class);
+ /**
+ * Window over which back-to-back metadata events are batched into one
Diff frame.
+ * Small enough to feel "live", large enough to amortise rapid bursts
(e.g. test
+ * setups that create N topics in a tight loop).
+ */
+ private static final Duration COALESCE_WINDOW = Duration.ofMillis(50);
+
+ private final Logger log;
+
+ @Getter
+ private final long watchId;
+ private final NamespaceName namespace;
+ private final Map<String, String> propertyFilters;
+ /**
+ * Hash of the topic set the client believes it has. Optional: present on
+ * reconnect, absent on first subscribe. When equal to the freshly-computed
+ * hash on the broker, {@link #start()} skips emitting the initial
snapshot —
+ * the client's state is already correct.
+ */
+ private final String clientHash;
+ private final ServerCnx cnx;
+ private final ScalableTopicResources resources;
+ private final ScheduledExecutorService scheduler;
+
+ /** {@code /topics/<tenant>/<namespace>} — direct children are scalable
topic records. */
+ private final String basePath;
+
+ /**
+ * Topics currently in the matching set. Maintained server-side so we can
detect
+ * actual membership flips on Modified events (filter changes a topic
in/out).
+ * Topic names are fully-qualified ({@code topic://tenant/ns/name}).
+ */
+ private final Set<String> currentSet = Collections.synchronizedSet(new
HashSet<>());
+
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicBoolean snapshotEmitted = new AtomicBoolean();
+
+ // --- Coalescing state. All three fields guarded by `coalesceLock`. ---
+ private final Object coalesceLock = new Object();
+ private final LinkedHashSet<String> pendingAdded = new LinkedHashSet<>();
+ private final LinkedHashSet<String> pendingRemoved = new LinkedHashSet<>();
+ private boolean flushScheduled = false;
+
+ public ScalableTopicsWatcherSession(long watchId,
+ NamespaceName namespace,
+ Map<String, String> propertyFilters,
+ String clientHash,
+ ServerCnx cnx,
+ ScalableTopicResources resources,
+ ScheduledExecutorService scheduler) {
+ this.watchId = watchId;
+ this.namespace = namespace;
+ this.propertyFilters = propertyFilters == null ? Map.of() :
propertyFilters;
+ this.clientHash = clientHash;
+ this.cnx = cnx;
+ this.resources = resources;
+ this.scheduler = scheduler;
+ this.basePath = resources.namespacePath(namespace);
+ this.log = LOG.with()
+ .attr("namespace", namespace)
+ .attr("watchId", watchId)
+ .attr("filters", this.propertyFilters)
+ .build();
+ }
+
+ @Override
+ public NamespaceName getNamespaceName() {
+ return namespace;
+ }
+
+ /**
+ * Start the watch: register on the namespace listener registry first (so
events
+ * during snapshot computation are queued, not lost), compute the initial
filtered
+ * set, then emit the {@code Snapshot} frame. After that, deltas flow
through the
+ * listener.
+ */
+ public CompletableFuture<Void> start() {
+ // Register BEFORE computing the initial set: any event that arrives
mid-snapshot
+ // is captured by the listener and either (a) already in the initial
set we're
+ // about to emit, in which case the redundant Add is a no-op on the
client
+ // (set semantics), or (b) genuinely newer than the snapshot, in which
case it
+ // correctly flows through as a Diff after the Snapshot frame.
+ resources.registerNamespaceListener(this);
+
+ return resources.findScalableTopicsByPropertiesAsync(namespace,
propertyFilters)
+ .thenAccept(initialTopics -> {
+ if (closed.get()) {
+ return;
+ }
+ // Replace currentSet under sync to avoid races with
onNotification.
+ synchronized (currentSet) {
+ currentSet.clear();
+ currentSet.addAll(initialTopics);
+ }
+ snapshotEmitted.set(true);
+ // Hash short-circuit: if the client tells us it already
has this
+ // exact set (reconnect within an unchanged window), don't
waste
+ // bytes on the wire. Future Diffs flow as usual.
+ if (clientHash != null) {
+ String serverHash =
TopicList.calculateHash(initialTopics);
+ if (clientHash.equals(serverHash)) {
+ log.info().attr("topics", initialTopics.size())
+ .log("Reconnect hash matched; skipping
snapshot");
+ return;
+ }
+ }
+ log.info().attr("topics",
initialTopics.size()).log("Initial snapshot");
+ cnx.ctx().writeAndFlush(
+ Commands.newWatchScalableTopicsSnapshot(watchId,
initialTopics));
+ });
+ }
+
+ /**
+ * Invoked by {@link ScalableTopicResources} for every metadata event
whose path
+ * is a direct child of this watcher's namespace base path. The
resources-level
+ * fan-out has already done the namespace + direct-child filtering, so we
go
+ * straight to evaluating the filter and updating the matching set.
+ */
+ @Override
+ public void onNotification(Notification notification) {
+ if (closed.get()) {
+ return;
+ }
+ String path = notification.getPath();
+ // Resources-level fan-out guarantees direct-child paths under
basePath, but
+ // re-derive the encoded local name defensively.
+ String rest = path.startsWith(basePath + "/")
+ ? path.substring(basePath.length() + 1) : path;
+
+ String topicName = TopicName.get("topic", namespace,
Codec.decode(rest)).toString();
+
+ if (notification.getType() == NotificationType.Deleted) {
+ if (currentSet.remove(topicName)) {
+ enqueueRemoved(topicName);
+ }
+ return;
+ }
+
+ // Created or Modified — fetch the new value to evaluate the filter
against.
+ TopicName tn = TopicName.get(topicName);
+ resources.getScalableTopicMetadataAsync(tn, true)
+ .whenComplete((optMd, ex) -> {
+ if (closed.get()) {
+ return;
+ }
+ if (ex != null) {
+ log.warn().attr("topic",
topicName).exceptionMessage(ex)
+ .log("Failed to load scalable topic metadata
for filter eval");
+ return;
+ }
+ boolean wasInSet = currentSet.contains(topicName);
+ boolean shouldBeInSet = optMd.isPresent() &&
matchesFilters(optMd.get());
+ if (!wasInSet && shouldBeInSet) {
+ currentSet.add(topicName);
+ enqueueAdded(topicName);
+ } else if (wasInSet && !shouldBeInSet) {
+ currentSet.remove(topicName);
+ enqueueRemoved(topicName);
+ }
+ });
+ }
+
+ private boolean matchesFilters(ScalableTopicMetadata metadata) {
+ if (propertyFilters.isEmpty()) {
+ return true;
+ }
+ Map<String, String> p = metadata.getProperties();
+ if (p == null) {
+ return false;
+ }
+ for (var e : propertyFilters.entrySet()) {
+ if (!e.getValue().equals(p.get(e.getKey()))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void enqueueAdded(String topic) {
+ synchronized (coalesceLock) {
+ // Cancel out a pending remove (e.g. rapid remove-then-add of same
topic).
+ pendingRemoved.remove(topic);
+ pendingAdded.add(topic);
+ scheduleFlush();
+ }
+ }
+
+ private void enqueueRemoved(String topic) {
+ synchronized (coalesceLock) {
+ pendingAdded.remove(topic);
+ pendingRemoved.add(topic);
+ scheduleFlush();
+ }
+ }
+
+ private void scheduleFlush() {
+ // Caller holds coalesceLock.
+ if (flushScheduled) {
+ return;
+ }
+ flushScheduled = true;
+ scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(),
TimeUnit.MILLISECONDS);
+ }
+
+ private void flushPending() {
+ if (closed.get()) {
+ return;
+ }
+ Set<String> added;
+ Set<String> removed;
+ synchronized (coalesceLock) {
+ added = pendingAdded.isEmpty() ? Set.of() : new
LinkedHashSet<>(pendingAdded);
+ removed = pendingRemoved.isEmpty() ? Set.of() : new
LinkedHashSet<>(pendingRemoved);
+ pendingAdded.clear();
+ pendingRemoved.clear();
+ flushScheduled = false;
+ }
+ if (added.isEmpty() && removed.isEmpty()) {
+ return;
+ }
+ // Wait until the initial snapshot was emitted: any deltas that fire
before the
+ // snapshot is sent have already been folded into currentSet via
onNotification's
+ // wasInSet check (same set we built the snapshot from), so they're
correctly
+ // represented. We just need to send them AFTER the snapshot frame.
+ if (!snapshotEmitted.get()) {
+ // Re-defer: the snapshot future is short-lived, retry shortly.
+ scheduler.schedule(this::flushPending, COALESCE_WINDOW.toMillis(),
TimeUnit.MILLISECONDS);
+ // Re-enqueue what we drained, since the next flush will rebuild
from pending.
+ synchronized (coalesceLock) {
+ pendingAdded.addAll(added);
+ pendingRemoved.addAll(removed);
+ flushScheduled = true;
+ }
+ return;
+ }
+ log.info().attr("added", added.size()).attr("removed",
removed.size()).log("Pushing diff");
+ cnx.ctx().writeAndFlush(Commands.newWatchScalableTopicsDiff(watchId,
added, removed));
+ }
+
+ /**
+ * Drop the session. Deregister from the resources' namespace listener
registry so
+ * the per-event fan-out skips us — no listener leak, no per-notification
dispatch
+ * tax for the broker's lifetime.
+ */
+ public void close() {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+ resources.deregisterNamespaceListener(this);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
new file mode 100644
index 00000000000..1e9d84897cd
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicsWatcherSessionHashTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.LocalMemoryMetadataStore;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the reconnect-hash short-circuit in
+ * {@link ScalableTopicsWatcherSession}: when the client's reported hash
matches
+ * the freshly-computed server hash, {@code start()} returns without writing a
+ * Snapshot frame; otherwise it emits Snapshot as usual.
+ */
+public class ScalableTopicsWatcherSessionHashTest {
+
+ private MetadataStoreExtended store;
+ private ScalableTopicResources resources;
+ private ServerCnx cnx;
+ private ChannelHandlerContext ctx;
+ private ScheduledExecutorService scheduler;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ store = new LocalMemoryMetadataStore("memory:local",
+ MetadataStoreConfig.builder().build());
+ resources = new ScalableTopicResources(store, 30);
+ cnx = mock(ServerCnx.class);
+ ctx = mock(ChannelHandlerContext.class);
+ when(cnx.ctx()).thenReturn(ctx);
+ // writeAndFlush returns a ChannelFuture; provide a no-op promise to
keep the
+ // call chain happy. The ScalableTopicsWatcherSession ignores the
return.
+ when(ctx.writeAndFlush(org.mockito.ArgumentMatchers.any()))
+ .thenReturn(new DefaultChannelPromise(new EmbeddedChannel(),
+ ImmediateEventExecutor.INSTANCE));
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws Exception {
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+ if (store != null) {
+ store.close();
+ }
+ }
+
+ private ScalableTopicMetadata meta() {
+ return ScalableTopicMetadata.builder().epoch(0).nextSegmentId(1)
+ .properties(Map.of()).build();
+ }
+
+ @Test
+ public void noHashOnFirstSubscribeEmitsSnapshot() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-fresh-" + suffix());
+ TopicName tn = TopicName.get("topic://" + ns + "/t1");
+ resources.createScalableTopicAsync(tn, meta()).get();
+
+ var session = new ScalableTopicsWatcherSession(1L, ns, Map.of(),
+ /* clientHash= */ null, cnx, resources, scheduler);
+ session.start().get();
+
+ // First subscribe: no hash → broker must emit one Snapshot frame.
+ verify(ctx,
atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+ }
+
+ @Test
+ public void matchingHashOnReconnectSkipsSnapshot() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-match-" + suffix());
+ TopicName tn = TopicName.get("topic://" + ns + "/t1");
+ resources.createScalableTopicAsync(tn, meta()).get();
+
+ // Client believes the namespace contains exactly this one topic.
Compute the
+ // matching hash with the same function the broker uses
(TopicList.crc32c).
+ String matchingHash = TopicList.calculateHash(List.of(tn.toString()));
+
+ var session = new ScalableTopicsWatcherSession(2L, ns, Map.of(),
+ /* clientHash= */ matchingHash, cnx, resources, scheduler);
+ session.start().get();
+
+ // Hash matched → no Snapshot frame written. Future Diffs would still
flow
+ // through writeAndFlush, but start() itself must stay silent.
+ verify(ctx,
never()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+ }
+
+ @Test
+ public void differingHashOnReconnectStillEmitsSnapshot() throws Exception {
+ NamespaceName ns = NamespaceName.get("tenant/ns-diff-" + suffix());
+ TopicName tn = TopicName.get("topic://" + ns + "/t1");
+ resources.createScalableTopicAsync(tn, meta()).get();
+
+ // Client thinks the set is something different. Broker must emit a
fresh
+ // Snapshot so the client can reconcile.
+ String staleHash = TopicList.calculateHash(List.of("topic://" + ns +
"/something-else"));
+
+ var session = new ScalableTopicsWatcherSession(3L, ns, Map.of(),
+ /* clientHash= */ staleHash, cnx, resources, scheduler);
+ session.start().get();
+
+ verify(ctx,
atLeastOnce()).writeAndFlush(org.mockito.ArgumentMatchers.any(ByteBuf.class));
+ }
+
+ private static String suffix() {
+ return UUID.randomUUID().toString().substring(0, 8);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
new file mode 100644
index 00000000000..ffdf0b0d277
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableTopicsWatcherTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end coverage for the broker-side namespace scalable-topics watcher.
+ *
+ * <p>Uses reflection to construct a {@code ScalableTopicsWatcher}
(package-private
+ * in the v5 module) directly against the test cluster's v4 client. Verifies:
+ * <ul>
+ * <li>Initial snapshot reflects pre-existing topics.</li>
+ * <li>Topic create / delete fires {@code Diff} events with the right
names.</li>
+ * <li>Property filters narrow the matching set, and updating a topic's
properties
+ * to fall outside the filter emits {@code Removed}.</li>
+ * </ul>
+ */
+public class V5ScalableTopicsWatcherTest extends V5ClientBaseTest {
+
+ /** Captures every Snapshot/Diff so tests can assert on the cumulative
state. */
+ private static final class CapturingListener {
+ final Set<String> currentSet = ConcurrentHashMap.newKeySet();
+ final List<List<String>> snapshots = new CopyOnWriteArrayList<>();
+ // (added, removed) per diff
+ final List<Map.Entry<List<String>, List<String>>> diffs = new
CopyOnWriteArrayList<>();
+
+ void onSnapshot(List<String> topics) {
+ snapshots.add(List.copyOf(topics));
+ currentSet.clear();
+ currentSet.addAll(topics);
+ }
+
+ void onDiff(List<String> added, List<String> removed) {
+ diffs.add(Map.entry(List.copyOf(added), List.copyOf(removed)));
+ currentSet.removeAll(removed);
+ currentSet.addAll(added);
+ }
+ }
+
+ @Test
+ public void watcherEmitsCreateAndDeleteEvents() throws Exception {
+ NamespaceName ns = NamespaceName.get(getNamespace());
+
+ WatcherHandle handle = openWatcher(ns, Map.of());
+ try {
+ // Initial snapshot should be empty (or whatever pre-existing
topics; the
+ // shared cluster gives us a fresh per-test namespace so it should
be empty).
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> !handle.listener.snapshots.isEmpty());
+ assertEquals(handle.listener.snapshots.get(0).size(), 0,
+ "fresh namespace should yield empty initial snapshot");
+
+ String topicA = ns + "/a-" +
UUID.randomUUID().toString().substring(0, 8);
+ String topicB = ns + "/b-" +
UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic("topic://" + topicA, 1);
+ admin.scalableTopics().createScalableTopic("topic://" + topicB, 1);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(handle.listener.currentSet, Set.of(
+ "topic://" + topicA, "topic://" + topicB)));
+
+ admin.scalableTopics().deleteScalableTopic("topic://" + topicA,
true);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(handle.listener.currentSet, Set.of("topic://"
+ topicB)));
+ } finally {
+ handle.close();
+ }
+ }
+
+ @Test
+ public void watcherFiltersByProperty() throws Exception {
+ NamespaceName ns = NamespaceName.get(getNamespace());
+
+ WatcherHandle handle = openWatcher(ns, Map.of("owner", "alice"));
+ try {
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> !handle.listener.snapshots.isEmpty());
+
+ String aliceTopic = ns + "/alice-" +
UUID.randomUUID().toString().substring(0, 8);
+ String bobTopic = ns + "/bob-" +
UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic("topic://" +
aliceTopic, 1,
+ Map.of("owner", "alice"));
+ admin.scalableTopics().createScalableTopic("topic://" + bobTopic,
1,
+ Map.of("owner", "bob"));
+
+ // Only alice's topic should surface — bob is filtered out.
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(handle.listener.currentSet, Set.of("topic://"
+ aliceTopic)));
+
+ // bob's topic exists but never reaches the watcher's set.
+ assertTrue(!handle.listener.currentSet.contains("topic://" +
bobTopic));
+ } finally {
+ handle.close();
+ }
+ }
+
+ @Test
+ public void watcherEmptyFilterSubscribesToEveryTopic() throws Exception {
+ NamespaceName ns = NamespaceName.get(getNamespace());
+
+ // Pre-create a topic before the watcher opens so the initial snapshot
has work
+ // to do (proves the snapshot path, not just the live-event path).
+ String preTopic = ns + "/pre-" +
UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic("topic://" + preTopic, 1);
+
+ WatcherHandle handle = openWatcher(ns, Map.of());
+ try {
+ Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() ->
+ assertEquals(handle.listener.currentSet, Set.of("topic://"
+ preTopic)));
+ } finally {
+ handle.close();
+ }
+ }
+
+ // --- Helpers ---
+
+ /** Bundle of watcher state needed to clean up at the end of a test. */
+ private static final class WatcherHandle implements AutoCloseable {
+ final Object watcher;
+ final CapturingListener listener;
+ final CountDownLatch ready;
+
+ WatcherHandle(Object watcher, CapturingListener listener,
CountDownLatch ready) {
+ this.watcher = watcher;
+ this.listener = listener;
+ this.ready = ready;
+ }
+
+ @Override
+ public void close() throws Exception {
+ Method closeMethod = watcher.getClass().getDeclaredMethod("close");
+ closeMethod.setAccessible(true);
+ closeMethod.invoke(watcher);
+ }
+ }
+
+ /**
+ * Construct a {@code ScalableTopicsWatcher} via reflection (it's
package-private
+ * inside the v5 module, but we drive it directly from the broker tests).
Wire up
+ * a {@link CapturingListener}, call {@code start()}, and return a handle.
The
+ * initial snapshot is delivered via the future + {@code onSnapshot} on the
+ * listener for uniform handling — see the watcher's javadoc.
+ */
+ private WatcherHandle openWatcher(NamespaceName ns, Map<String, String>
filters)
+ throws Exception {
+ // Pull the underlying v4 client out of v5Client so we can construct
the
+ // (package-private) watcher directly.
+ Field v4Field = v5Client.getClass().getDeclaredField("v4Client");
+ v4Field.setAccessible(true);
+ PulsarClientImpl v4 = (PulsarClientImpl) v4Field.get(v5Client);
+
+ Class<?> watcherCls = Class.forName(
+ "org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher");
+ Constructor<?> ctor = watcherCls.getDeclaredConstructor(
+ PulsarClientImpl.class, NamespaceName.class, Map.class);
+ ctor.setAccessible(true);
+ Object watcher = ctor.newInstance(v4, ns, filters);
+
+ CapturingListener listener = new CapturingListener();
+
+ // The watcher's Listener is a package-private nested interface; use a
Proxy.
+ Class<?> listenerCls = Class.forName(
+
"org.apache.pulsar.client.impl.v5.ScalableTopicsWatcher$Listener");
+ Object listenerProxy = java.lang.reflect.Proxy.newProxyInstance(
+ listenerCls.getClassLoader(), new Class<?>[]{listenerCls},
(proxy, method, args) -> {
+ switch (method.getName()) {
+ case "onSnapshot":
+ listener.onSnapshot((List<String>) args[0]);
+ break;
+ case "onDiff":
+ listener.onDiff((List<String>) args[0],
(List<String>) args[1]);
+ break;
+ default:
+ // Object methods (toString, hashCode, equals) —
defaults are fine.
+ }
+ return null;
+ });
+
+ Method setListener = watcherCls.getDeclaredMethod("setListener",
listenerCls);
+ setListener.setAccessible(true);
+ setListener.invoke(watcher, listenerProxy);
+
+ Method start = watcherCls.getDeclaredMethod("start");
+ start.setAccessible(true);
+ java.util.concurrent.CompletableFuture<List<String>> startFut =
+ (java.util.concurrent.CompletableFuture<List<String>>)
start.invoke(watcher);
+
+ CountDownLatch ready = new CountDownLatch(1);
+ startFut.thenAccept(initialTopics -> {
+ // start()'s future delivers the initial snapshot — feed it into
the listener
+ // for uniform handling with subsequent snapshots.
+ listener.onSnapshot(initialTopics);
+ ready.countDown();
+ });
+ if (!ready.await(10, TimeUnit.SECONDS)) {
+ throw new AssertionError("watcher did not deliver initial snapshot
in 10s");
+ }
+ return new WatcherHandle(watcher, listener, ready);
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
new file mode 100644
index 00000000000..0350d83df52
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicsWatcher.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ScalableTopicsWatcherSession;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.topics.TopicList;
+import org.apache.pulsar.common.util.Backoff;
+
+/**
+ * Client-side manager for a namespace-wide scalable-topics watch session.
+ *
+ * <p>Opens a {@code CommandWatchScalableTopics} on attach, awaits the initial
+ * {@code Snapshot}, and forwards subsequent {@code Snapshot} / {@code Diff}
events
+ * to a {@link Listener}. On connection drop, schedules a reconnect with
backoff;
+ * when the new session lands, the broker pushes a fresh snapshot and the
client
+ * applies it as a state replacement (no missed events).
+ *
+ * <p>Mirrors {@link DagWatchClient} in shape; the wire path is different —
watch
+ * subscribe is fire-and-forget (no request/response), with the initial
snapshot
+ * arriving as the first {@code WatchScalableTopicsUpdate}.
+ */
+final class ScalableTopicsWatcher implements ScalableTopicsWatcherSession,
AutoCloseable {
+
+ private static final Logger LOG = Logger.get(ScalableTopicsWatcher.class);
+ private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0);
+
+ /**
+ * Listener for membership events. The watcher delivers events on the
netty IO
+ * thread; implementations should not block.
+ */
+ interface Listener {
+ /** Full set; replace any local state. */
+ void onSnapshot(List<String> topics);
+
+ /** Apply removed before added (covers rapid remove-then-add of same
name). */
+ void onDiff(List<String> added, List<String> removed);
+ }
+
+ private final Logger log;
+ private final PulsarClientImpl v4Client;
+ private final NamespaceName namespace;
+ private final Map<String, String> propertyFilters;
+ private final long watchId;
+ private final Backoff reconnectBackoff;
+
+ private final CompletableFuture<List<String>> initialSnapshotFuture = new
CompletableFuture<>();
+ /**
+ * Mirrors the broker's view of the matching set so we can hand a hash on
+ * reconnect — when the set hasn't changed, the broker skips emitting a
+ * fresh snapshot. Updated on every Snapshot replace and Diff apply.
+ * Synchronised because Snapshot / Diff arrive on the netty thread but the
+ * hash may be read on a reconnect callback running elsewhere.
+ */
+ private final Set<String> currentSet = Collections.synchronizedSet(new
HashSet<>());
+ private volatile Listener listener;
+ private volatile ClientCnx cnx;
+ private volatile boolean closed = false;
+
+ /**
+ * @param v4Client the underlying v4 client used to open broker
connections
+ * @param namespace namespace to watch
+ * @param propertyFilters AND filters; empty map = match all
+ */
+ ScalableTopicsWatcher(PulsarClientImpl v4Client, NamespaceName namespace,
+ Map<String, String> propertyFilters) {
+ this.v4Client = v4Client;
+ this.namespace = namespace;
+ this.propertyFilters = propertyFilters == null ? Map.of() :
propertyFilters;
+ this.watchId = WATCH_ID_GENERATOR.incrementAndGet();
+ this.reconnectBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(30))
+ .build();
+ this.log = LOG.with()
+ .attr("namespace", namespace)
+ .attr("watchId", watchId)
+ .attr("filters", this.propertyFilters)
+ .build();
+ }
+
+ /**
+ * Open the watch session on a connection to the configured service URL.
+ * Resolves with the initial snapshot's topic list. After this returns,
every
+ * subsequent {@code Snapshot} / {@code Diff} flows through {@link
#setListener}.
+ */
+ CompletableFuture<List<String>> start() {
+ v4Client.getConnectionToServiceUrl()
+ .thenAccept(this::attach)
+ .exceptionally(ex -> {
+ initialSnapshotFuture.completeExceptionally(ex);
+ return null;
+ });
+ return initialSnapshotFuture;
+ }
+
+ private void attach(ClientCnx newCnx) {
+ if (closed) {
+ return;
+ }
+ this.cnx = newCnx;
+ if (!newCnx.isSupportsScalableTopics()) {
+ initialSnapshotFuture.completeExceptionally(
+ new PulsarClientException.FeatureNotSupportedException(
+ "Broker does not support scalable topics",
+
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
+ return;
+ }
+ newCnx.registerScalableTopicsWatcher(watchId, this);
+ // First subscribe: send no hash so the broker emits the initial
snapshot
+ // unconditionally. snapshot is what populates initialSnapshotFuture.
+ newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics(
+ watchId, namespace.toString(), propertyFilters,
+ /* currentHash= */ null))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ newCnx.removeScalableTopicsWatcher(watchId);
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.completeExceptionally(
+ new
PulsarClientException(writeFuture.cause()));
+ }
+ }
+ });
+ }
+
+ @Override
+ public void onSnapshot(List<String> topics) {
+ if (closed) {
+ return;
+ }
+ log.info().attr("topics", topics.size()).log("Snapshot received");
+ // Reset backoff on every successful snapshot — that's the broker
confirming
+ // the session is live and our local state is consistent.
+ reconnectBackoff.reset();
+ // Replace local set so the next reconnect computes the right hash.
+ synchronized (currentSet) {
+ currentSet.clear();
+ currentSet.addAll(topics);
+ }
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.complete(topics);
+ // The listener is set by the caller AFTER start() resolves, so
the initial
+ // snapshot is delivered via the future, not via onSnapshot's
fan-out.
+ return;
+ }
+ Listener l = listener;
+ if (l != null) {
+ try {
+ l.onSnapshot(topics);
+ } catch (Exception e) {
+ log.error().exception(e).log("Listener threw on snapshot");
+ }
+ }
+ }
+
+ @Override
+ public void onDiff(List<String> added, List<String> removed) {
+ if (closed) {
+ return;
+ }
+ log.info().attr("added", added.size()).attr("removed", removed.size())
+ .log("Diff received");
+ // Apply removed before added — covers rapid remove-then-add of the
same name.
+ synchronized (currentSet) {
+ currentSet.removeAll(removed);
+ currentSet.addAll(added);
+ }
+ Listener l = listener;
+ if (l != null) {
+ try {
+ l.onDiff(added, removed);
+ } catch (Exception e) {
+ log.error().exception(e).log("Listener threw on diff");
+ }
+ }
+ }
+
+ /**
+ * Snapshot the current set under lock so the hash + the watch frame agree
on
+ * the same view. CRC32C of sorted topic names — same function used by
+ * {@code CommandGetTopicsOfNamespace} so behaviour matches the existing
+ * topic-list watch on the wire.
+ */
+ private String currentSetHash() {
+ synchronized (currentSet) {
+ if (currentSet.isEmpty()) {
+ return TopicList.calculateHash(java.util.List.of());
+ }
+ return TopicList.calculateHash(new HashSet<>(currentSet));
+ }
+ }
+
+ @Override
+ public void onError(ServerError error, String message) {
+ log.warn().attr("error", error).attr("message", message)
+ .log("WatchScalableTopics rejected");
+ if (!initialSnapshotFuture.isDone()) {
+ initialSnapshotFuture.completeExceptionally(
+ new PulsarClientException("WatchScalableTopics failed: " +
error
+ + (message != null ? " - " + message : "")));
+ }
+ }
+
+ @Override
+ public void connectionClosed() {
+ log.warn("Scalable-topics watcher connection closed");
+ cnx = null;
+ if (closed) {
+ return;
+ }
+ if (!initialSnapshotFuture.isDone()) {
+ // Initial subscribe never completed — surface the failure rather
than
+ // retrying silently behind the caller.
+ initialSnapshotFuture.completeExceptionally(
+ new PulsarClientException(
+ "Connection closed before initial scalable-topics
snapshot arrived"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ private void scheduleReconnect() {
+ if (closed) {
+ return;
+ }
+ long delayMs = reconnectBackoff.next().toMillis();
+ log.info().attr("delayMs", delayMs).log("Scheduling watcher
reconnect");
+ v4Client.timer().newTimeout(timeout -> reconnect(),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void reconnect() {
+ if (closed) {
+ return;
+ }
+ v4Client.getConnectionToServiceUrl()
+ .thenAccept(newCnx -> {
+ if (closed) {
+ return;
+ }
+ if (!newCnx.isSupportsScalableTopics()) {
+ log.warn().log("Watcher reconnect: broker doesn't
support scalable topics");
+ scheduleReconnect();
+ return;
+ }
+ this.cnx = newCnx;
+ newCnx.registerScalableTopicsWatcher(watchId, this);
+ // Reconnect: send the hash of our current set. If the
broker's
+ // freshly-computed hash matches, it skips emitting a
Snapshot —
+ // the watch is live and our local state is correct. Future
+ // Diffs flow as usual; if the hash differs the broker
sends a
+ // Snapshot which we apply as a full-state replace.
+ String hash = currentSetHash();
+ newCnx.ctx().writeAndFlush(Commands.newWatchScalableTopics(
+ watchId, namespace.toString(),
+ propertyFilters, hash))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+
newCnx.removeScalableTopicsWatcher(watchId);
+
log.warn().exceptionMessage(writeFuture.cause())
+ .log("Watcher reconnect write
failed");
+ scheduleReconnect();
+ return;
+ }
+ // Write reached the broker — connection is
healthy.
+ // Reset the backoff so the next disconnect
starts
+ // fresh. Crucial for the hash-skip path: the
broker
+ // emits no Snapshot, so onSnapshot's reset
never
+ // fires; without this, a chain of short blips
keeps
+ // the backoff at its peak forever.
+ reconnectBackoff.reset();
+ });
+ })
+ .exceptionally(ex -> {
+ log.warn().exceptionMessage(ex).log("Watcher reconnect
failed");
+ scheduleReconnect();
+ return null;
+ });
+ }
+
+ /**
+ * Set the listener that receives {@code Snapshot} / {@code Diff} events.
Should
+ * be called after {@link #start()} resolves — the initial snapshot is
delivered
+ * via that future, not via the listener.
+ */
+ void setListener(Listener listener) {
+ this.listener = listener;
+ }
+
+ /**
+ * Visible for testing — snapshot of the current set. In production, the
+ * listener is the source of truth; this method exists so tests can poke
the
+ * watcher's hash-tracking state directly.
+ */
+ Set<String> currentSetForTesting() {
+ synchronized (currentSet) {
+ return new HashSet<>(currentSet);
+ }
+ }
+
+ long watchId() {
+ return watchId;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.removeScalableTopicsWatcher(watchId);
+
c.ctx().writeAndFlush(Commands.newWatchScalableTopicsClose(watchId));
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index f45fc2c91f4..40c4e12aa05 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -178,6 +178,18 @@ public class ClientCnx extends PulsarHandler {
.concurrencyLevel(1)
.build();
+ /**
+ * Per-watcher namespace scalable-topics watch sessions, keyed by the
+ * {@code watchId} chosen by the client. The broker tags every
+ * {@link
org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate} with
+ * this id.
+ */
+ private final ConcurrentLongHashMap<ScalableTopicsWatcherSession>
scalableTopicsWatchers =
+ ConcurrentLongHashMap.<ScalableTopicsWatcherSession>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(1)
+ .build();
+
private final CompletableFuture<Void> connectionFuture = new
CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new
ConcurrentLinkedQueue<>();
@@ -375,6 +387,7 @@ public class ClientCnx extends PulsarHandler {
topicListWatchers.forEach((__, watcher) ->
watcher.connectionClosed(this));
dagWatchSessions.forEach((__, session) -> session.connectionClosed());
scalableConsumerSessions.forEach((__, session) ->
session.connectionClosed());
+ scalableTopicsWatchers.forEach((__, session) ->
session.connectionClosed());
waitingLookupRequests.clear();
@@ -383,6 +396,7 @@ public class ClientCnx extends PulsarHandler {
topicListWatchers.clear();
dagWatchSessions.clear();
scalableConsumerSessions.clear();
+ scalableTopicsWatchers.clear();
timeoutTask.cancel(true);
}
@@ -1427,6 +1441,75 @@ public class ClientCnx extends PulsarHandler {
scalableConsumerSessions.remove(consumerId);
}
+ @Override
+ protected void handleCommandWatchScalableTopicsUpdate(
+
org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate cmd) {
+ checkArgument(state == State.Ready);
+
+ long watchId = cmd.getWatchId();
+ log.debug().attr("watchId", watchId).log("Received
WatchScalableTopicsUpdate");
+
+ if (cmd.hasError()) {
+ // Error response — terminal for this watch (subscribe was
rejected or the
+ // server failed to compute the initial set). Drop the local
registration so
+ // a retry from the caller starts fresh.
+ ScalableTopicsWatcherSession session =
scalableTopicsWatchers.remove(watchId);
+ if (session != null) {
+ session.onError(cmd.getError(), cmd.hasMessage() ?
cmd.getMessage() : null);
+ } else {
+ log.warn().attr("watchId", watchId)
+ .log("Received scalable-topics watch error for unknown
watcher");
+ }
+ return;
+ }
+
+ ScalableTopicsWatcherSession session =
scalableTopicsWatchers.get(watchId);
+ if (session == null) {
+ log.warn().attr("watchId", watchId)
+ .log("Received scalable-topics watch update for unknown
watcher");
+ return;
+ }
+ // Snapshot and diff are mutually exclusive via the proto oneof;
switch on the
+ // generated event-case enum and unpack accordingly.
+ switch (cmd.getEventCase()) {
+ case SNAPSHOT -> {
+ var snapshot = cmd.getSnapshot();
+ java.util.List<String> topics = new
java.util.ArrayList<>(snapshot.getTopicsCount());
+ for (int i = 0; i < snapshot.getTopicsCount(); i++) {
+ topics.add(snapshot.getTopicAt(i));
+ }
+ session.onSnapshot(topics);
+ }
+ case DIFF -> {
+ var diff = cmd.getDiff();
+ // LightProto pluralises count accessors with a trailing 's'
on the field name,
+ // hence `getAddedsCount` / `getRemovedsCount`.
+ java.util.List<String> added = new
java.util.ArrayList<>(diff.getAddedsCount());
+ for (int i = 0; i < diff.getAddedsCount(); i++) {
+ added.add(diff.getAddedAt(i));
+ }
+ java.util.List<String> removed = new
java.util.ArrayList<>(diff.getRemovedsCount());
+ for (int i = 0; i < diff.getRemovedsCount(); i++) {
+ removed.add(diff.getRemovedAt(i));
+ }
+ session.onDiff(added, removed);
+ }
+ case NOT_SET -> log.warn().attr("watchId", watchId)
+ .log("Received scalable-topics watch update with no event
payload");
+ default -> log.warn().attr("watchId", watchId)
+ .attr("case", cmd.getEventCase())
+ .log("Received scalable-topics watch update with unknown
event case");
+ }
+ }
+
+ public void registerScalableTopicsWatcher(long watchId,
ScalableTopicsWatcherSession watcher) {
+ scalableTopicsWatchers.put(watchId, watcher);
+ }
+
+ public void removeScalableTopicsWatcher(long watchId) {
+ scalableTopicsWatchers.remove(watchId);
+ }
+
/**
* check serverError and take appropriate action.
* <ul>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
new file mode 100644
index 00000000000..7b71dc208ec
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableTopicsWatcherSession.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.List;
+import org.apache.pulsar.common.api.proto.ServerError;
+
+/**
+ * Client-side callback for a namespace-wide scalable-topics watch session.
+ *
+ * <p>The broker pushes either a full {@code Snapshot} (initial subscribe and
on every
+ * reconnect resync) or an incremental {@code Diff}. Implementations apply the
+ * snapshot as a full state replacement and the diff as a set delta — both are
+ * idempotent and self-healing across reconnects.
+ *
+ * <p>Implemented by the V5 client's {@code ScalableTopicsWatcher}.
+ */
+public interface ScalableTopicsWatcherSession {
+
+ /**
+ * Full set of topics currently matching the watch's filters. The
implementation
+ * should replace any local state derived from prior events with this
snapshot.
+ */
+ void onSnapshot(List<String> topics);
+
+ /**
+ * Incremental membership change. Apply {@code removed} before {@code
added}
+ * (covers a rapid remove-then-add of the same topic name within a
coalescing
+ * window).
+ */
+ void onDiff(List<String> added, List<String> removed);
+
+ /**
+ * The broker rejected the watch (e.g. authz, broker shutting down).
+ * Implementations should fail any pending start future and stop emitting
events.
+ */
+ void onError(ServerError error, String message);
+
+ /**
+ * The underlying connection dropped. Implementations should treat any
local set
+ * as stale until the next snapshot arrives.
+ */
+ void connectionClosed();
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 02e03a491d0..2c0c74fb378 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -1774,6 +1774,95 @@ public class Commands {
return serializeWithSize(cmd);
}
+ /**
+ * Client -> Broker: open a scalable-topics watch session.
+ *
+ * @param watchId client-assigned watch identifier
+ * @param namespace tenant/namespace to scope the watch to
+ * @param consumerName optional caller identity (carried for a future
namespace
+ * coordinator); pass {@code null} if not yet
assigned
+ * @param propertyFilters AND filters; empty / null = match all topics in
the namespace
+ */
+ /**
+ * @param currentHash optional hash of the client's currently-known topic
set.
+ * Pass on reconnect to let the broker skip the
snapshot when
+ * state hasn't changed; pass {@code null} on first
subscribe.
+ */
+ public static ByteBuf newWatchScalableTopics(long watchId, String
namespace,
+ java.util.Map<String,
String> propertyFilters,
+ String currentHash) {
+ BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS);
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopics watch =
+ cmd.setWatchScalableTopics()
+ .setWatchId(watchId)
+ .setNamespace(namespace);
+ if (propertyFilters != null) {
+ for (var entry : propertyFilters.entrySet()) {
+ watch.addPropertyFilter()
+ .setKey(entry.getKey())
+ .setValue(entry.getValue());
+ }
+ }
+ if (currentHash != null) {
+ watch.setCurrentHash(currentHash);
+ }
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newWatchScalableTopicsClose(long watchId) {
+ BaseCommand cmd = localCmd(Type.WATCH_SCALABLE_TOPICS_CLOSE);
+ cmd.setWatchScalableTopicsClose().setWatchId(watchId);
+ return serializeWithSize(cmd);
+ }
+
+ /**
+ * Broker -> Client: emit a full snapshot of the matching topic set. Sent
on initial
+ * subscribe and on every reconnect-resync; the client replaces its local
state.
+ */
+ public static ByteBuf newWatchScalableTopicsSnapshot(long watchId,
+
java.util.Collection<String> topics) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+ var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId);
+ var snapshot = update.setSnapshot();
+ for (String t : topics) {
+ snapshot.addTopic(t);
+ }
+ return serializeWithSize(cmd);
+ }
+
+ /**
+ * Broker -> Client: emit an incremental membership change. Either {@code
added} or
+ * {@code removed} (or both) may be empty. Apply removed before added when
both
+ * appear together.
+ */
+ public static ByteBuf newWatchScalableTopicsDiff(long watchId,
+
java.util.Collection<String> added,
+
java.util.Collection<String> removed) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+ var update = cmd.setWatchScalableTopicsUpdate().setWatchId(watchId);
+ var diff = update.setDiff();
+ if (added != null) {
+ for (String t : added) {
+ diff.addAdded(t);
+ }
+ }
+ if (removed != null) {
+ for (String t : removed) {
+ diff.addRemoved(t);
+ }
+ }
+ return serializeWithSize(cmd);
+ }
+
+ public static ByteBuf newWatchScalableTopicsError(long watchId,
ServerError error, String message) {
+ BaseCommand cmd = new
BaseCommand().setType(Type.WATCH_SCALABLE_TOPICS_UPDATE);
+ cmd.setWatchScalableTopicsUpdate()
+ .setWatchId(watchId)
+ .setError(error)
+ .setMessage(message);
+ return serializeWithSize(cmd);
+ }
+
public static ByteBuf serializeWithSize(BaseCommand cmd) {
return serializeWithPrecalculatedSerializedSize(cmd,
cmd.getSerializedSize());
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 0eb2b1b9670..bca87683f27 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -514,6 +514,21 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
handleCommandScalableTopicAssignmentUpdate(cmd.getScalableTopicAssignmentUpdate());
break;
+ case WATCH_SCALABLE_TOPICS:
+ checkArgument(cmd.hasWatchScalableTopics());
+ handleCommandWatchScalableTopics(cmd.getWatchScalableTopics());
+ break;
+
+ case WATCH_SCALABLE_TOPICS_UPDATE:
+ checkArgument(cmd.hasWatchScalableTopicsUpdate());
+
handleCommandWatchScalableTopicsUpdate(cmd.getWatchScalableTopicsUpdate());
+ break;
+
+ case WATCH_SCALABLE_TOPICS_CLOSE:
+ checkArgument(cmd.hasWatchScalableTopicsClose());
+
handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose());
+ break;
+
default:
break;
}
@@ -807,6 +822,23 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
+ protected void handleCommandWatchScalableTopics(
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopics
commandWatchScalableTopics) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandWatchScalableTopicsUpdate(
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsUpdate
+ commandWatchScalableTopicsUpdate) {
+ throw new UnsupportedOperationException();
+ }
+
+ protected void handleCommandWatchScalableTopicsClose(
+ org.apache.pulsar.common.api.proto.CommandWatchScalableTopicsClose
+ commandWatchScalableTopicsClose) {
+ throw new UnsupportedOperationException();
+ }
+
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto
b/pulsar-common/src/main/proto/PulsarApi.proto
index 20bd8ab8185..6d135297317 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -940,6 +940,57 @@ message CommandScalableTopicAssignmentUpdate {
required ScalableConsumerAssignment assignment = 2;
}
+// Multi-topic consumer watcher: subscribes to the union of scalable topics in
a
+// namespace that match a (possibly empty) set of property filters. The broker
keeps
+// pushing updates as topics enter or leave the matching set. See
+// `multi-topic-consumer-design.md` for the full design.
+
+// Client -> Broker: open a watch session.
+message CommandWatchScalableTopics {
+ required uint64 watch_id = 1; // Client-assigned watch ID
+ required string namespace = 2; // tenant/namespace
+ // Optional AND filters; empty list means "match all topics in the
namespace".
+ repeated KeyValue property_filters = 3;
+ // Hash of the topics the client believes are currently in its set. Sent on
+ // reconnect; absent on first subscribe. If it matches the broker's freshly
+ // computed hash, the broker skips emitting the initial Snapshot — the
client's
+ // local state is already correct and future Diffs will flow as usual. Same
+ // hash function as CommandGetTopicsOfNamespace (CRC32C over sorted
topics).
+ optional string current_hash = 4;
+}
+
+// Snapshot of the full matching set. Sent on initial subscribe and on every
+// reconnect-resync. The client replaces its local set with this list.
+message ScalableTopicsSnapshot {
+ repeated string topics = 1;
+}
+
+// Incremental membership change. Apply removed before added when both are
present.
+message ScalableTopicsDiff {
+ repeated string added = 1;
+ repeated string removed = 2;
+}
+
+// Broker -> Client: either Snapshot or Diff (mutually exclusive via oneof).
When the
+// initial subscribe fails, neither variant is set and `error`/`message` carry
the
+// failure reason.
+message CommandWatchScalableTopicsUpdate {
+ required uint64 watch_id = 1;
+
+ oneof event {
+ ScalableTopicsSnapshot snapshot = 2;
+ ScalableTopicsDiff diff = 3;
+ }
+
+ optional ServerError error = 4;
+ optional string message = 5;
+}
+
+// Client -> Broker: close the watch session.
+message CommandWatchScalableTopicsClose {
+ required uint64 watch_id = 1;
+}
+
message CommandGetSchema {
required uint64 request_id = 1;
required string topic = 2;
@@ -1185,6 +1236,10 @@ message BaseCommand {
SCALABLE_TOPIC_SUBSCRIBE = 73;
SCALABLE_TOPIC_SUBSCRIBE_RESPONSE = 74;
SCALABLE_TOPIC_ASSIGNMENT_UPDATE = 75;
+
+ WATCH_SCALABLE_TOPICS = 76;
+ WATCH_SCALABLE_TOPICS_UPDATE = 77;
+ WATCH_SCALABLE_TOPICS_CLOSE = 78;
}
@@ -1276,4 +1331,8 @@ message BaseCommand {
optional CommandScalableTopicSubscribe scalableTopicSubscribe
= 73;
optional CommandScalableTopicSubscribeResponse
scalableTopicSubscribeResponse = 74;
optional CommandScalableTopicAssignmentUpdate
scalableTopicAssignmentUpdate = 75;
+
+ optional CommandWatchScalableTopics watchScalableTopics
= 76;
+ optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate
= 77;
+ optional CommandWatchScalableTopicsClose watchScalableTopicsClose
= 78;
}