This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7c7a7df1603 [feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer
(#25651)
7c7a7df1603 is described below
commit 7c7a7df1603197a9492bd1222ca19f90d0d2c430
Author: Matteo Merli <[email protected]>
AuthorDate: Sat May 2 06:09:23 2026 -0700
[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer (#25651)
---
.../api/v5/V5MultiTopicQueueConsumerTest.java | 149 ++++++
.../api/v5/V5MultiTopicStreamConsumerTest.java | 208 ++++++++
.../pulsar/client/api/v5/QueueConsumerBuilder.java | 45 +-
.../client/api/v5/StreamConsumerBuilder.java | 29 +-
.../org/apache/pulsar/client/api/v5/Examples.java | 17 +-
.../client/impl/v5/AsyncQueueConsumerV5.java | 4 +-
.../apache/pulsar/client/impl/v5/MessageIdV5.java | 262 ++++++++--
.../apache/pulsar/client/impl/v5/MessageV5.java | 43 +-
.../client/impl/v5/MultiTopicQueueConsumer.java | 426 ++++++++++++++++
.../client/impl/v5/MultiTopicStreamConsumer.java | 542 +++++++++++++++++++++
.../client/impl/v5/QueueConsumerBuilderV5.java | 49 +-
.../pulsar/client/impl/v5/QueueConsumerImpl.java | 35 ++
.../client/impl/v5/ScalableQueueConsumer.java | 46 +-
.../client/impl/v5/ScalableStreamConsumer.java | 53 +-
.../client/impl/v5/StreamConsumerBuilderV5.java | 38 +-
.../pulsar/client/impl/v5/MessageIdV5Test.java | 58 +++
16 files changed, 1872 insertions(+), 132 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
new file mode 100644
index 00000000000..4cf5f0ff666
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic
+ * QueueConsumer follows the matching set live, multiplexes from every
per-topic
+ * consumer into one user-visible queue, and routes individual acks back to the
+ * right topic for redelivery purposes.
+ */
+public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest {
+
+ private String topicName(String suffix) {
+ return "topic://" + getNamespace() + "/" + suffix + "-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ }
+
+ @Test
+ public void receivesFromAllTopicsInNamespace() throws Exception {
+ String topicA = topicName("a");
+ String topicB = topicName("b");
+ admin.scalableTopics().createScalableTopic(topicA, 1);
+ admin.scalableTopics().createScalableTopic(topicB, 1);
+
+ @Cleanup
+ Producer<String> pa =
v5Client.newProducer(Schema.string()).topic(topicA).create();
+ @Cleanup
+ Producer<String> pb =
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-q")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Send to both topics; the multi-topic consumer must receive both
sets.
+ Set<String> expected = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ String va = "a-" + i;
+ String vb = "b-" + i;
+ pa.newMessage().value(va).send();
+ pb.newMessage().value(vb).send();
+ expected.add(va);
+ expected.add(vb);
+ }
+
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected.size() && System.currentTimeMillis()
< deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ }
+ assertEquals(received, expected, "should receive every message
produced to either topic");
+ }
+
+ @Test
+ public void picksUpTopicCreatedAfterSubscribe() throws Exception {
+ // Fresh namespace, no topics yet — initial snapshot is empty.
Earliest so the
+ // race between "topic created" and "per-topic consumer attached via
Diff" can't
+ // drop messages produced in that window.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-q-late")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Create a topic AFTER subscribe; the watcher's Diff event must
trigger the
+ // consumer to attach.
+ String lateTopic = topicName("late");
+ admin.scalableTopics().createScalableTopic(lateTopic, 1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(lateTopic).create();
+ producer.newMessage().value("late-message").send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+ assertTrue(msg != null, "expected to receive message from late-added
topic");
+ assertEquals(msg.value(), "late-message");
+ assertEquals(msg.topic(), lateTopic, "topic() should surface the
parent scalable topic");
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception
{
+ String aliceTopic = topicName("alice");
+ String bobTopic = topicName("bob");
+ admin.scalableTopics().createScalableTopic(aliceTopic, 1,
Map.of("owner", "alice"));
+ admin.scalableTopics().createScalableTopic(bobTopic, 1,
Map.of("owner", "bob"));
+
+ @Cleanup
+ Producer<String> pa =
v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
+ @Cleanup
+ Producer<String> pb =
v5Client.newProducer(Schema.string()).topic(bobTopic).create();
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .namespace(getNamespace(), Map.of("owner", "alice"))
+ .subscriptionName("multi-q-filter")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ pa.newMessage().value("alice-msg").send();
+ pb.newMessage().value("bob-msg").send();
+
+ // Only alice's message reaches this consumer.
+ Message<String> got = consumer.receive(Duration.ofSeconds(10));
+ assertTrue(got != null, "expected one message");
+ assertEquals(got.value(), "alice-msg");
+ consumer.acknowledge(got.id());
+
+ // Confirm bob's message never arrives within a generous window.
+ Message<String> empty = consumer.receive(Duration.ofSeconds(2));
+ assertTrue(empty == null, "bob's message must be filtered out, got " +
empty);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
new file mode 100644
index 00000000000..f3299b1c74b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic
+ * StreamConsumer follows the matching set live, multiplexes from every
per-topic
+ * consumer into one user-visible queue, and supports cumulative ack across
+ * topics via per-message position vectors.
+ */
+public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest {
+
+ private String topicName(String suffix) {
+ return "topic://" + getNamespace() + "/" + suffix + "-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ }
+
+ @Test
+ public void receivesFromAllTopicsInNamespace() throws Exception {
+ String topicA = topicName("a");
+ String topicB = topicName("b");
+ admin.scalableTopics().createScalableTopic(topicA, 1);
+ admin.scalableTopics().createScalableTopic(topicB, 1);
+
+ @Cleanup
+ Producer<String> pa =
v5Client.newProducer(Schema.string()).topic(topicA).create();
+ @Cleanup
+ Producer<String> pb =
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-stream")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> expected = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ String va = "a-" + i;
+ String vb = "b-" + i;
+ pa.newMessage().value(va).send();
+ pb.newMessage().value(vb).send();
+ expected.add(va);
+ expected.add(vb);
+ }
+
+ Set<String> received = new HashSet<>();
+ MessageId last = null;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected.size() && System.currentTimeMillis()
< deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ last = msg.id();
+ }
+ }
+ assertEquals(received, expected, "should receive every message
produced to either topic");
+ // Cumulative ack must succeed across both per-topic consumers —
exercises the
+ // multi-topic position vector embedded in the message id.
+ assertNotNull(last);
+ consumer.acknowledgeCumulative(last);
+ }
+
+ @Test
+ public void picksUpTopicCreatedAfterSubscribe() throws Exception {
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-stream-late")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ String lateTopic = topicName("late");
+ admin.scalableTopics().createScalableTopic(lateTopic, 1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(lateTopic).create();
+ producer.newMessage().value("late-message").send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+ assertTrue(msg != null, "expected to receive message from late-added
topic");
+ assertEquals(msg.value(), "late-message");
+ assertEquals(msg.topic(), lateTopic, "topic() should surface the
parent scalable topic");
+ consumer.acknowledgeCumulative(msg.id());
+ }
+
+ @Test
+ public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception {
+ // Two topics, interleaved producers. After we cumulatively ack the
LAST message,
+ // closing and re-subscribing must NOT redeliver any of the previous
messages —
+ // that would only happen if the per-topic ack didn't fire for the
topic that's
+ // not the message's own.
+ String topicA = topicName("a");
+ String topicB = topicName("b");
+ admin.scalableTopics().createScalableTopic(topicA, 1);
+ admin.scalableTopics().createScalableTopic(topicB, 1);
+
+ @Cleanup
+ Producer<String> pa =
v5Client.newProducer(Schema.string()).topic(topicA).create();
+ @Cleanup
+ Producer<String> pb =
v5Client.newProducer(Schema.string()).topic(topicB).create();
+
+ StreamConsumer<String> first =
v5Client.newStreamConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-stream-cumulative")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 5;
+ for (int i = 0; i < n; i++) {
+ pa.newMessage().value("a-" + i).send();
+ pb.newMessage().value("b-" + i).send();
+ }
+
+ // Drain everything, remember the last message id.
+ Set<String> drained = new HashSet<>();
+ MessageId last = null;
+ long deadline = System.currentTimeMillis() + 20_000L;
+ while (drained.size() < 2 * n && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = first.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ drained.add(msg.value());
+ last = msg.id();
+ }
+ }
+ assertEquals(drained.size(), 2 * n, "first consumer should drain every
message");
+ assertNotNull(last);
+ // Cumulative ack — should fan out to BOTH topics' per-segment acks.
+ first.acknowledgeCumulative(last);
+ // Block briefly so the async ack flushes through to the broker before
we close.
+ Thread.sleep(500);
+ first.close();
+
+ // Re-subscribe with the same name. If the cumulative ack covered both
topics,
+ // there's nothing to re-deliver. If it only acked the message's OWN
topic, the
+ // other topic would re-deliver from the start.
+ @Cleanup
+ StreamConsumer<String> second =
v5Client.newStreamConsumer(Schema.string())
+ .namespace(getNamespace())
+ .subscriptionName("multi-stream-cumulative")
+ .subscribe();
+
+ Message<String> stale = second.receive(Duration.ofSeconds(2));
+ assertTrue(stale == null,
+ "cumulative ack should have covered every topic; got
redelivery: "
+ + (stale != null ? stale.value() : ""));
+ }
+
+ @Test
+ public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception
{
+ String aliceTopic = topicName("alice");
+ String bobTopic = topicName("bob");
+ admin.scalableTopics().createScalableTopic(aliceTopic, 1,
Map.of("owner", "alice"));
+ admin.scalableTopics().createScalableTopic(bobTopic, 1,
Map.of("owner", "bob"));
+
+ @Cleanup
+ Producer<String> pa =
v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
+ @Cleanup
+ Producer<String> pb =
v5Client.newProducer(Schema.string()).topic(bobTopic).create();
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .namespace(getNamespace(), Map.of("owner", "alice"))
+ .subscriptionName("multi-stream-filter")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ pa.newMessage().value("alice-msg").send();
+ pb.newMessage().value("bob-msg").send();
+
+ Message<String> got = consumer.receive(Duration.ofSeconds(10));
+ assertTrue(got != null, "expected one message");
+ assertEquals(got.value(), "alice-msg");
+
+ Message<String> empty = consumer.receive(Duration.ofSeconds(2));
+ assertTrue(empty == null, "bob's message must be filtered out, got " +
empty);
+ }
+}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
index a6249efaca3..db03f3c5469 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java
@@ -19,10 +19,8 @@
package org.apache.pulsar.client.api.v5;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
@@ -52,38 +50,37 @@ public interface QueueConsumerBuilder<T> {
CompletableFuture<QueueConsumer<T>> subscribeAsync();
// --- Topic selection ---
+ // Either {@link #topic(String)} or {@link #namespace} must be set, not
both.
/**
- * The topic(s) to subscribe to.
+ * Subscribe to a single scalable topic by name.
*
- * @param topicNames one or more topic names
+ * @param topicName the fully-qualified topic name (e.g. {@code
topic://tenant/ns/name})
* @return this builder instance for chaining
*/
- QueueConsumerBuilder<T> topic(String... topicNames);
+ QueueConsumerBuilder<T> topic(String topicName);
/**
- * The topics to subscribe to.
+ * Subscribe to every scalable topic under a namespace. The matching set
follows
+ * live: when topics are created in or deleted from the namespace, the
consumer
+ * attaches / detaches automatically.
*
- * @param topicNames the list of topic names
+ * @param namespace the namespace in {@code tenant/namespace} form
* @return this builder instance for chaining
*/
- QueueConsumerBuilder<T> topics(List<String> topicNames);
+ QueueConsumerBuilder<T> namespace(String namespace);
/**
- * Subscribe to all topics matching a regex pattern.
+ * Subscribe to scalable topics under a namespace whose properties match
every
+ * key/value pair in {@code propertyFilters} (AND semantics). An empty map
is
+ * equivalent to {@link #namespace(String)} — every topic in the namespace.
+ * The matching set follows live as topic properties change.
*
- * @param pattern the compiled regex pattern to match topic names against
+ * @param namespace the namespace in {@code tenant/namespace} form
+ * @param propertyFilters property name/value pairs that all must match
* @return this builder instance for chaining
*/
- QueueConsumerBuilder<T> topicsPattern(Pattern pattern);
-
- /**
- * Subscribe to all topics matching a regex pattern (string form).
- *
- * @param regex the regex pattern string to match topic names against
- * @return this builder instance for chaining
- */
- QueueConsumerBuilder<T> topicsPattern(String regex);
+ QueueConsumerBuilder<T> namespace(String namespace, Map<String, String>
propertyFilters);
// --- Subscription ---
@@ -190,16 +187,6 @@ public interface QueueConsumerBuilder<T> {
*/
QueueConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy policy);
- // --- Pattern subscription ---
-
- /**
- * How often to re-discover topics matching the pattern.
- *
- * @param interval the auto-discovery interval
- * @return this builder instance for chaining
- */
- QueueConsumerBuilder<T> patternAutoDiscoveryPeriod(Duration interval);
-
// --- Encryption ---
/**
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
index ad6f0df0da6..0963f792035 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java
@@ -48,14 +48,37 @@ public interface StreamConsumerBuilder<T> {
CompletableFuture<StreamConsumer<T>> subscribeAsync();
// --- Required ---
+ // Either {@link #topic(String)} or {@link #namespace} must be set, not
both.
/**
- * The topic(s) to subscribe to.
+ * Subscribe to a single scalable topic by name.
*
- * @param topicNames one or more topic names
+ * @param topicName the fully-qualified topic name (e.g. {@code
topic://tenant/ns/name})
* @return this builder instance for chaining
*/
- StreamConsumerBuilder<T> topic(String... topicNames);
+ StreamConsumerBuilder<T> topic(String topicName);
+
+ /**
+ * Subscribe to every scalable topic under a namespace. The matching set
follows
+ * live: when topics are created in or deleted from the namespace, the
consumer
+ * attaches / detaches automatically.
+ *
+ * @param namespace the namespace in {@code tenant/namespace} form
+ * @return this builder instance for chaining
+ */
+ StreamConsumerBuilder<T> namespace(String namespace);
+
+ /**
+ * Subscribe to scalable topics under a namespace whose properties match
every
+ * key/value pair in {@code propertyFilters} (AND semantics). An empty map
is
+ * equivalent to {@link #namespace(String)} — every topic in the namespace.
+ * The matching set follows live as topic properties change.
+ *
+ * @param namespace the namespace in {@code tenant/namespace} form
+ * @param propertyFilters property name/value pairs that all must match
+ * @return this builder instance for chaining
+ */
+ StreamConsumerBuilder<T> namespace(String namespace, Map<String, String>
propertyFilters);
/**
* The subscription name.
diff --git
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
index 2cc17d67b68..582a41a0574 100644
---
a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
+++
b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.api.v5;
import java.time.Duration;
import java.time.Instant;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.v5.async.AsyncProducer;
import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
@@ -394,15 +395,21 @@ public class Examples {
}
//
==================================================================================
- // 8. Multi-topic queue consumer with pattern
+ // 8. Multi-topic queue consumer over a namespace, optionally filtered by
property
//
==================================================================================
- /** Subscribe to all topics matching a pattern. */
- void patternSubscription(PulsarClient client) throws Exception {
+ /**
+ * Subscribe to every scalable topic in a namespace whose properties match
the
+ * given key/value pairs. The matching set follows live: when topics are
created
+ * with matching properties, the consumer attaches automatically; when
they're
+ * deleted or change properties out of the filter, it detaches. Pass
+ * {@code Map.of()} (or use the single-arg overload) to subscribe to every
+ * scalable topic in the namespace.
+ */
+ void namespaceSubscription(PulsarClient client) throws Exception {
try (var consumer = client.newQueueConsumer(Schema.string())
- .topicsPattern("persistent://public/default/events-.*")
+ .namespace("public/default", Map.of("kind", "events"))
.subscriptionName("all-events")
- .patternAutoDiscoveryPeriod(Duration.ofMinutes(1))
.subscribe()) {
while (true) {
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
index f66e1227794..7242b5e53f7 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java
@@ -29,9 +29,9 @@ import
org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
*/
final class AsyncQueueConsumerV5<T> implements AsyncQueueConsumer<T> {
- private final ScalableQueueConsumer<T> consumer;
+ private final QueueConsumerImpl<T> consumer;
- AsyncQueueConsumerV5(ScalableQueueConsumer<T> consumer) {
+ AsyncQueueConsumerV5(QueueConsumerImpl<T> consumer) {
this.consumer = consumer;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
index 3a39986f3c7..97a47eff60f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java
@@ -20,9 +20,16 @@ package org.apache.pulsar.client.impl.v5;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.MessageId;
+
+// The v5 MessageId interface and the underlying v4 MessageId class share the
same simple
+// name in different packages — a real conflict that Java imports can't
resolve. We import
+// the v4 type as `MessageId` (used heavily) and refer to v5 via FQN in the
few places it
+// appears (the `implements` clause and `compareTo`).
/**
* V5 MessageId implementation that wraps a v4 MessageId and includes a
position
@@ -36,17 +43,20 @@ import org.apache.pulsar.client.api.v5.MessageId;
* <p>For non-cumulative consumers (QueueConsumer) and readers
(CheckpointConsumer),
* only the single segment ID and v4 message ID are needed; the position
vector is
* empty.
+ *
+ * <p>Multi-topic consumer wrappers additionally tag the id with the parent
scalable
+ * topic (for ack routing) and a cross-topic position vector (for cumulative
ack
+ * across topics). Both fields are optional and round-trip through
+ * {@link #toByteArray} / {@link #fromByteArray}.
*/
-public final class MessageIdV5 implements MessageId {
+public final class MessageIdV5 implements
org.apache.pulsar.client.api.v5.MessageId {
static final long NO_SEGMENT = -1;
- static final MessageIdV5 EARLIEST = new MessageIdV5(
- org.apache.pulsar.client.api.MessageId.earliest, NO_SEGMENT,
Map.of());
- static final MessageIdV5 LATEST = new MessageIdV5(
- org.apache.pulsar.client.api.MessageId.latest, NO_SEGMENT,
Map.of());
+ static final MessageIdV5 EARLIEST = new MessageIdV5(MessageId.earliest,
NO_SEGMENT, Map.of());
+ static final MessageIdV5 LATEST = new MessageIdV5(MessageId.latest,
NO_SEGMENT, Map.of());
- private final org.apache.pulsar.client.api.MessageId v4MessageId;
+ private final MessageId v4MessageId;
private final long segmentId;
/**
@@ -54,30 +64,73 @@ public final class MessageIdV5 implements MessageId {
* taken at the moment this message was delivered to the application.
* Used by StreamConsumer for cumulative ack across all segments.
*/
- private final Map<Long, org.apache.pulsar.client.api.MessageId>
positionVector;
+ private final Map<Long, MessageId> positionVector;
+
+ /**
+ * Parent scalable topic this message was delivered from. Set only by
multi-topic
+ * consumer wrappers so they can route a subsequent ack back to the right
+ * per-topic consumer; {@code null} for single-topic consumers (where the
consumer
+ * already knows its topic).
+ */
+ private final String parentTopic;
+
+ /**
+ * Cross-topic position vector for multi-topic StreamConsumer cumulative
ack.
+ * Maps parent topic name → (segment id → latest-delivered msgId at the
moment
+ * this message entered the multiplexed queue). {@code null} for
single-topic
+ * messages — use {@link #positionVector()} instead.
+ */
+ private final Map<String, Map<Long, MessageId>> multiTopicVector;
/**
- * Create a MessageIdV5 with a position vector for cumulative ack support.
+ * Create a MessageIdV5 with a position vector for single-topic cumulative
ack.
*/
- public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId,
+ public MessageIdV5(MessageId v4MessageId,
long segmentId,
- Map<Long, org.apache.pulsar.client.api.MessageId>
positionVector) {
- this.v4MessageId = Objects.requireNonNull(v4MessageId);
- this.segmentId = segmentId;
- this.positionVector = Map.copyOf(positionVector);
+ Map<Long, MessageId> positionVector) {
+ this(v4MessageId, segmentId, positionVector, null, null);
+ }
+
+ /**
+ * Constructor for multi-topic queue consumer messages: parent topic tag
for ack
+ * routing; no cross-topic position vector (queue consumers don't do
cumulative
+ * ack).
+ */
+ public MessageIdV5(MessageId v4MessageId,
+ long segmentId,
+ Map<Long, MessageId> positionVector,
+ String parentTopic) {
+ this(v4MessageId, segmentId, positionVector, parentTopic, null);
}
/**
* Create a MessageIdV5 without a position vector (for individual ack /
reader use).
*/
- public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId,
long segmentId) {
- this(v4MessageId, segmentId, Map.of());
+ public MessageIdV5(MessageId v4MessageId, long segmentId) {
+ this(v4MessageId, segmentId, Map.of(), null, null);
+ }
+
+ /**
+ * Full constructor for multi-topic StreamConsumer messages: parent topic
+ the
+ * cross-topic position vector captured at enqueue time. Used by the
multi-topic
+ * stream consumer's pump.
+ */
+ public MessageIdV5(MessageId v4MessageId,
+ long segmentId,
+ Map<Long, MessageId> positionVector,
+ String parentTopic,
+ Map<String, Map<Long, MessageId>> multiTopicVector) {
+ this.v4MessageId = Objects.requireNonNull(v4MessageId);
+ this.segmentId = segmentId;
+ this.positionVector = Map.copyOf(positionVector);
+ this.parentTopic = parentTopic;
+ this.multiTopicVector = multiTopicVector;
}
/**
* Get the underlying v4 MessageId. Package-private for internal use.
*/
- org.apache.pulsar.client.api.MessageId v4MessageId() {
+ MessageId v4MessageId() {
return v4MessageId;
}
@@ -92,36 +145,123 @@ public final class MessageIdV5 implements MessageId {
* Get the position vector — the latest delivered message ID per segment
at the
* time this message was delivered. Used by StreamConsumer for cumulative
ack.
*/
- Map<Long, org.apache.pulsar.client.api.MessageId> positionVector() {
+ Map<Long, MessageId> positionVector() {
return positionVector;
}
+ /**
+ * Parent scalable topic when this message was delivered through a
multi-topic
+ * consumer; {@code null} for single-topic consumers. Package-private —
used by
+ * multi-topic ack routing only.
+ */
+ String parentTopic() {
+ return parentTopic;
+ }
+
+ Map<String, Map<Long, MessageId>> multiTopicVector() {
+ return multiTopicVector;
+ }
+
+ /**
+ * Wire format. All sections are length-prefixed so the reader can detect
+ * absent trailing sections (older serialised forms wrote only sections
1-3).
+ *
+ * <pre>
+ * 1. segmentId : 8 bytes
+ * 2. v4MessageId : 4-byte length + bytes
+ * 3. positionVector : 4-byte count + repeating { 8-byte segId,
+ * 4-byte len,
idBytes }
+ * 4. parentTopic : 4-byte length (-1 = null) + UTF-8 bytes
+ * 5. multiTopicVector : 4-byte count (-1 = null) + repeating {
+ * 4-byte topic name length, UTF-8 bytes,
+ * 4-byte segCount, repeating { 8-byte
segId,
+ * 4-byte
len, idBytes } }
+ * </pre>
+ *
+ * <p>Sections 4 and 5 are present in every new id; older serialisations
from a
+ * pre-multi-topic build are still readable — the reader treats the missing
+ * sections as null.
+ */
@Override
public byte[] toByteArray() {
byte[] v4Bytes = v4MessageId.toByteArray();
- // Format: [8 bytes segmentId] [4 bytes v4Length] [v4Bytes]
- // [4 bytes numPositions] [for each: [8 bytes segId] [4 bytes
idLen] [idBytes]]
- int totalSize = 8 + 4 + v4Bytes.length + 4;
- var serializedPositions = new java.util.HashMap<Long, byte[]>();
- for (var entry : positionVector.entrySet()) {
- byte[] idBytes = entry.getValue().toByteArray();
- serializedPositions.put(entry.getKey(), idBytes);
- totalSize += 8 + 4 + idBytes.length;
+
+ // Pre-serialise position-vector entries so we can size the buffer.
+ Map<Long, byte[]> serializedPositions =
serializeSegmentVector(positionVector);
+ int positionBytes = 4; // count
+ for (var entry : serializedPositions.entrySet()) {
+ positionBytes += 8 + 4 + entry.getValue().length;
+ }
+
+ // Section 4: parent topic.
+ byte[] parentTopicBytes = parentTopic == null
+ ? null : parentTopic.getBytes(StandardCharsets.UTF_8);
+
+ // Section 5: pre-serialise the multi-topic vector tree.
+ Map<byte[], Map<Long, byte[]>> serializedMulti = null;
+ int multiBytes = 4; // count or -1
+ if (multiTopicVector != null) {
+ serializedMulti = new HashMap<>(multiTopicVector.size());
+ for (var entry : multiTopicVector.entrySet()) {
+ byte[] topicBytes =
entry.getKey().getBytes(StandardCharsets.UTF_8);
+ Map<Long, byte[]> inner =
serializeSegmentVector(entry.getValue());
+ serializedMulti.put(topicBytes, inner);
+ multiBytes += 4 + topicBytes.length + 4;
+ for (var seg : inner.entrySet()) {
+ multiBytes += 8 + 4 + seg.getValue().length;
+ }
+ }
}
+ int totalSize = 8 + 4 + v4Bytes.length + positionBytes
+ + 4 + (parentTopicBytes == null ? 0 : parentTopicBytes.length)
+ + multiBytes;
+
ByteBuffer buf = ByteBuffer.allocate(totalSize);
buf.putLong(segmentId);
buf.putInt(v4Bytes.length);
buf.put(v4Bytes);
- buf.putInt(positionVector.size());
+
+ buf.putInt(serializedPositions.size());
for (var entry : serializedPositions.entrySet()) {
buf.putLong(entry.getKey());
buf.putInt(entry.getValue().length);
buf.put(entry.getValue());
}
+
+ if (parentTopicBytes == null) {
+ buf.putInt(-1);
+ } else {
+ buf.putInt(parentTopicBytes.length);
+ buf.put(parentTopicBytes);
+ }
+
+ if (serializedMulti == null) {
+ buf.putInt(-1);
+ } else {
+ buf.putInt(serializedMulti.size());
+ for (var entry : serializedMulti.entrySet()) {
+ buf.putInt(entry.getKey().length);
+ buf.put(entry.getKey());
+ buf.putInt(entry.getValue().size());
+ for (var seg : entry.getValue().entrySet()) {
+ buf.putLong(seg.getKey());
+ buf.putInt(seg.getValue().length);
+ buf.put(seg.getValue());
+ }
+ }
+ }
return buf.array();
}
+ private static Map<Long, byte[]> serializeSegmentVector(Map<Long,
MessageId> vector) {
+ Map<Long, byte[]> out = new HashMap<>(vector.size());
+ for (var entry : vector.entrySet()) {
+ out.put(entry.getKey(), entry.getValue().toByteArray());
+ }
+ return out;
+ }
+
static MessageIdV5 fromByteArray(byte[] data) throws IOException {
if (data == null || data.length < 12) {
throw new IOException("Invalid MessageIdV5 data: too short");
@@ -134,30 +274,66 @@ public final class MessageIdV5 implements MessageId {
}
byte[] v4Bytes = new byte[v4Length];
buf.get(v4Bytes);
- org.apache.pulsar.client.api.MessageId v4Id =
- org.apache.pulsar.client.api.MessageId.fromByteArray(v4Bytes);
+ MessageId v4Id = MessageId.fromByteArray(v4Bytes);
+
+ // Section 3: position vector (single-topic / per-segment).
+ Map<Long, MessageId> positions = Map.of();
+ if (buf.hasRemaining()) {
+ positions = readSegmentVector(buf);
+ }
+
+ // Section 4: parent topic. Length -1 sentinel means "absent".
+ String parentTopic = null;
+ if (buf.hasRemaining()) {
+ int parentLen = buf.getInt();
+ if (parentLen >= 0) {
+ if (parentLen > buf.remaining()) {
+ throw new IOException("Invalid MessageIdV5 data: bad
parent-topic length");
+ }
+ byte[] parentBytes = new byte[parentLen];
+ buf.get(parentBytes);
+ parentTopic = new String(parentBytes, StandardCharsets.UTF_8);
+ }
+ }
- // Read position vector if present
- Map<Long, org.apache.pulsar.client.api.MessageId> positions = Map.of();
+ // Section 5: cross-topic vector. Count -1 sentinel means "absent".
+ Map<String, Map<Long, MessageId>> multiTopic = null;
if (buf.hasRemaining()) {
- int numPositions = buf.getInt();
- var posMap = new java.util.HashMap<Long,
org.apache.pulsar.client.api.MessageId>();
- for (int i = 0; i < numPositions; i++) {
- long posSegId = buf.getLong();
- int idLen = buf.getInt();
- byte[] idBytes = new byte[idLen];
- buf.get(idBytes);
- posMap.put(posSegId,
-
org.apache.pulsar.client.api.MessageId.fromByteArray(idBytes));
+ int topicCount = buf.getInt();
+ if (topicCount >= 0) {
+ multiTopic = new HashMap<>(topicCount);
+ for (int i = 0; i < topicCount; i++) {
+ int topicLen = buf.getInt();
+ byte[] topicBytes = new byte[topicLen];
+ buf.get(topicBytes);
+ String topic = new String(topicBytes,
StandardCharsets.UTF_8);
+ Map<Long, MessageId> inner = readSegmentVector(buf);
+ multiTopic.put(topic, inner);
+ }
}
- positions = posMap;
}
- return new MessageIdV5(v4Id, segmentId, positions);
+ return new MessageIdV5(v4Id, segmentId, positions, parentTopic,
multiTopic);
+ }
+
+ private static Map<Long, MessageId> readSegmentVector(ByteBuffer buf)
throws IOException {
+ int count = buf.getInt();
+ Map<Long, MessageId> out = new HashMap<>(count);
+ for (int i = 0; i < count; i++) {
+ long segId = buf.getLong();
+ int idLen = buf.getInt();
+ if (idLen < 0 || idLen > buf.remaining()) {
+ throw new IOException("Invalid MessageIdV5 data: bad inner id
length");
+ }
+ byte[] idBytes = new byte[idLen];
+ buf.get(idBytes);
+ out.put(segId, MessageId.fromByteArray(idBytes));
+ }
+ return out;
}
@Override
- public int compareTo(MessageId other) {
+ public int compareTo(org.apache.pulsar.client.api.v5.MessageId other) {
if (!(other instanceof MessageIdV5 o)) {
throw new IllegalArgumentException("Cannot compare with " +
other.getClass());
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
index 910bb114e51..1a9444911f4 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java
@@ -31,13 +31,19 @@ final class MessageV5<T> implements Message<T> {
private final org.apache.pulsar.client.api.Message<T> v4Message;
private final MessageIdV5 messageId;
+ /**
+ * Optional override for {@link #topic()}. Set by multi-topic consumer
wrappers to
+ * the parent scalable topic name (the v4 message's own topic is the
segment topic,
+ * which is internal). Null for single-topic consumers — {@code topic()}
falls back
+ * to the v4 message topic in that case.
+ */
+ private final String topicOverride;
/**
* Create with a simple segment ID (for queue consumer, checkpoint
consumer, producer).
*/
MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, long
segmentId) {
- this.v4Message = v4Message;
- this.messageId = new MessageIdV5(v4Message.getMessageId(), segmentId);
+ this(v4Message, new MessageIdV5(v4Message.getMessageId(), segmentId),
null);
}
/**
@@ -45,8 +51,36 @@ final class MessageV5<T> implements Message<T> {
* (for stream consumer cumulative ack support).
*/
MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, MessageIdV5
messageId) {
+ this(v4Message, messageId, null);
+ }
+
+ /**
+ * Create with an explicit topic override — used by multi-topic consumer
wrappers
+ * to surface the parent scalable topic via {@link #topic()} instead of the
+ * underlying segment topic. {@code topicOverride} may be {@code null}.
+ */
+ MessageV5(org.apache.pulsar.client.api.Message<T> v4Message, MessageIdV5
messageId,
+ String topicOverride) {
this.v4Message = v4Message;
this.messageId = messageId;
+ this.topicOverride = topicOverride;
+ }
+
+ /**
+ * Re-brand this message with a parent scalable topic. Used by multi-topic
consumer
+ * wrappers when forwarding from a per-topic consumer's queue into the
shared
+ * multiplexed queue: the message id picks up the parent for ack routing,
and
+ * {@link #topic()} starts returning the parent.
+ */
+ MessageV5<T> withTopicOverride(String parentTopic) {
+ MessageIdV5 newId = new MessageIdV5(messageId.v4MessageId(),
messageId.segmentId(),
+ messageId.positionVector(), parentTopic);
+ return new MessageV5<>(v4Message, newId, parentTopic);
+ }
+
+ /** Underlying v4 message — exposed to multi-topic wrappers that re-build
with a new id. */
+ org.apache.pulsar.client.api.Message<T> v4Message() {
+ return v4Message;
}
@Override
@@ -98,7 +132,10 @@ final class MessageV5<T> implements Message<T> {
@Override
public String topic() {
- return v4Message.getTopicName();
+ // Multi-topic consumer wrappers set topicOverride to the parent
scalable topic
+ // so the user-visible topic() matches the topic they subscribed to
(the
+ // v4 message carries the internal segment topic).
+ return topicOverride != null ? topicOverride :
v4Message.getTopicName();
}
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
new file mode 100644
index 00000000000..102d6de6c52
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Multi-topic {@link QueueConsumer} that subscribes to every scalable topic
in a
+ * namespace matching a (possibly empty) set of property filters. The matching
set
+ * follows live: when topics enter or leave the filter, the consumer attaches /
+ * detaches automatically via a long-lived {@link ScalableTopicsWatcher}
session
+ * to the broker.
+ *
+ * <p>Internals:
+ * <ul>
+ * <li>One {@link ScalableQueueConsumer} per matched topic.</li>
+ * <li>A pump thread per topic forwards from the per-topic queue into the
shared
+ * multiplexed queue, tagging each message with the parent topic so the
+ * subsequent ack can be routed back.</li>
+ * <li>The watcher's {@code Snapshot} replaces the active set; {@code Diff}
+ * applies removals (flushing acks first) before additions to handle a
+ * rapid remove-then-add of the same topic name.</li>
+ * <li>Per-topic add failures retry forever with exponential backoff (100 ms
+ * initial, 30 min cap).</li>
+ * </ul>
+ */
+final class MultiTopicQueueConsumer<T> implements QueueConsumerImpl<T> {
+
+ private static final Logger LOG =
Logger.get(MultiTopicQueueConsumer.class);
+ /**
+ * Cap for per-topic subscribe retries. Matches v4 consumer reconnect
semantics —
+ * the consumer never gives up; the user just sees no messages from the
topic
+ * until the broker / topic recovers.
+ */
+ private static final Duration RETRY_MAX = Duration.ofMinutes(30);
+ private final Logger log;
+
+ private final PulsarClientV5 client;
+ private final Schema<T> v5Schema;
+ private final ConsumerConfigurationData<T> consumerConf;
+ private final NamespaceName namespace;
+ private final Map<String, String> propertyFilters;
+ private final String subscriptionName;
+
+ private final ScalableTopicsWatcher watcher;
+ private final ConcurrentHashMap<String, PerTopicState<T>> perTopic = new
ConcurrentHashMap<>();
+ private final LinkedTransferQueue<MessageV5<T>> mux = new
LinkedTransferQueue<>();
+
+ private volatile boolean closed = false;
+ private final AsyncQueueConsumerV5<T> asyncView;
+
+ private MultiTopicQueueConsumer(PulsarClientV5 client,
+ Schema<T> v5Schema,
+ ConsumerConfigurationData<T> consumerConf,
+ NamespaceName namespace,
+ Map<String, String> propertyFilters,
+ ScalableTopicsWatcher watcher) {
+ this.client = client;
+ this.v5Schema = v5Schema;
+ this.consumerConf = consumerConf;
+ this.namespace = namespace;
+ this.propertyFilters = propertyFilters;
+ this.subscriptionName = consumerConf.getSubscriptionName();
+ this.watcher = watcher;
+ this.log = LOG.with()
+ .attr("namespace", namespace)
+ .attr("subscription", subscriptionName)
+ .attr("filters", propertyFilters)
+ .build();
+ this.asyncView = new AsyncQueueConsumerV5<>(this);
+ }
+
+ static <T> CompletableFuture<QueueConsumer<T>> createAsync(PulsarClientV5
client,
+ Schema<T>
v5Schema,
+
ConsumerConfigurationData<T> consumerConf,
+ NamespaceName
namespace,
+ Map<String,
String> propertyFilters) {
+ ScalableTopicsWatcher watcher = new ScalableTopicsWatcher(
+ client.v4Client(), namespace, propertyFilters);
+ MultiTopicQueueConsumer<T> consumer = new MultiTopicQueueConsumer<>(
+ client, v5Schema, consumerConf, namespace, propertyFilters,
watcher);
+ return watcher.start()
+ .thenCompose(initial -> consumer.openInitial(initial))
+ .thenApply(__ -> {
+ watcher.setListener(consumer.new WatcherListener());
+ return (QueueConsumer<T>) consumer;
+ })
+ .exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
+ throw ex instanceof CompletionException ce ? ce : new
CompletionException(ex);
+ }));
+ }
+
+ /**
+ * Open one per-topic consumer per topic in the initial snapshot. Block on
every
+ * future so {@code subscribeAsync} only resolves once the consumer is
fully
+ * attached — gives the user the same all-or-nothing semantics as the
+ * single-topic builder.
+ */
+ private CompletableFuture<Void> openInitial(List<String> topics) {
+ if (topics.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<?>> opens = new ArrayList<>(topics.size());
+ for (String t : topics) {
+ opens.add(openTopic(t, /* retry= */ false));
+ }
+ return
CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new));
+ }
+
+ /**
+ * Subscribe to one topic. When {@code retry} is true, failures schedule a
+ * background retry with exponential backoff; the returned future
completes as
+ * soon as the first attempt finishes (success or failure) so we don't
hold up
+ * Snapshot / Diff processing.
+ */
+ private CompletableFuture<Void> openTopic(String topicName, boolean retry)
{
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (perTopic.containsKey(topicName)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName topic = V5Utils.asScalableTopicName(topicName);
+ DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
+ // Per-topic message sink: tag each delivered message with the parent
scalable
+ // topic for ack routing + display, and forward into the shared mux.
No pump
+ // thread; per-segment v4 receive loops fire this sink directly.
+ java.util.function.Consumer<MessageV5<T>> sink = msg -> {
+ if (!closed) {
+ mux.add(msg.withTopicOverride(topicName));
+ }
+ };
+ return dagWatch.start()
+ .thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl(
+ client, v5Schema, perTopicConf(topicName), dagWatch,
layout, sink))
+ .thenAccept(qc -> {
+ if (closed) {
+ qc.closeAsync();
+ return;
+ }
+ PerTopicState<T> state = new PerTopicState<>(topicName,
qc);
+ PerTopicState<T> existing =
perTopic.putIfAbsent(topicName, state);
+ if (existing != null) {
+ // Concurrent open; drop the dup.
+ qc.closeAsync();
+ return;
+ }
+ log.info().attr("topic", topicName).log("Per-topic
consumer attached");
+ })
+ .exceptionally(ex -> {
+ Throwable cause = ex instanceof CompletionException ce &&
ce.getCause() != null
+ ? ce.getCause() : ex;
+ if (retry && !closed) {
+ scheduleRetry(topicName);
+ }
+ log.warn().attr("topic", topicName).exceptionMessage(cause)
+ .log("Per-topic subscribe failed");
+ return null;
+ });
+ }
+
+ private void scheduleRetry(String topicName) {
+ long delayMs = nextBackoff(topicName);
+ log.info().attr("topic", topicName).attr("delayMs", delayMs)
+ .log("Retrying per-topic subscribe after backoff");
+ client.v4Client().timer().newTimeout(timeout -> openTopic(topicName,
/* retry= */ true),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private final ConcurrentHashMap<String, AtomicLong> retryDelays = new
ConcurrentHashMap<>();
+
+ /** Returns the next exponential-backoff delay (ms) for a topic and
updates the state. */
+ private long nextBackoff(String topicName) {
+ AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new
AtomicLong(100));
+ long current = al.get();
+ long next = Math.min(current * 2, RETRY_MAX.toMillis());
+ al.set(next);
+ return current;
+ }
+
+ private void resetBackoff(String topicName) {
+ retryDelays.remove(topicName);
+ }
+
+ /**
+ * Clone the user's consumer config for a per-topic consumer. Each
per-topic
+ * consumer needs a unique {@code consumerName} so the broker can
disambiguate
+ * them on the same subscription; we suffix with the topic name.
+ */
+ private ConsumerConfigurationData<T> perTopicConf(String topicName) {
+ var conf = consumerConf.clone();
+ if (consumerConf.getConsumerName() != null) {
+ // Disambiguate across topics — the broker side cares about
uniqueness on
+ // the same Shared subscription per segment.
+ String localName = TopicName.get(topicName).getLocalName();
+ conf.setConsumerName(consumerConf.getConsumerName() + "-" +
localName);
+ }
+ return conf;
+ }
+
+ /**
+ * Close per-topic consumer for a topic that has dropped out of the
matching set.
+ * No explicit ack flush — Shared subscription acks are independent per
message
+ * and the per-topic consumer's existing close already flushes pending
acks via
+ * its v4 segment consumers.
+ */
+ private CompletableFuture<Void> closeTopic(String topicName) {
+ retryDelays.remove(topicName);
+ PerTopicState<T> state = perTopic.remove(topicName);
+ if (state == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return state.consumer.closeAsync()
+ .thenRun(() -> log.info().attr("topic", topicName)
+ .log("Per-topic consumer detached"));
+ }
+
+ // --- QueueConsumer ---
+
+ @Override
+ public String topic() {
+ return "namespace://" + namespace;
+ }
+
+ @Override
+ public String subscription() {
+ return subscriptionName;
+ }
+
+ @Override
+ public String consumerName() {
+ return consumerConf.getConsumerName();
+ }
+
+ @Override
+ public Message<T> receive() throws PulsarClientException {
+ try {
+ return mux.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Receive interrupted", e);
+ }
+ }
+
+ @Override
+ public Message<T> receive(Duration timeout) throws PulsarClientException {
+ try {
+ return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Receive interrupted", e);
+ }
+ }
+
+ @Override
+ public void acknowledge(MessageId messageId) {
+ routeAck(messageId, ptc -> ptc.acknowledge(messageId));
+ }
+
+ @Override
+ public void acknowledge(MessageId messageId, Transaction txn) {
+ routeAck(messageId, ptc -> ptc.acknowledge(messageId, txn));
+ }
+
+ @Override
+ public void negativeAcknowledge(MessageId messageId) {
+ routeAck(messageId, ptc -> ptc.negativeAcknowledge(messageId));
+ }
+
+ /** Look up the per-topic consumer via the parent topic tag and delegate.
*/
+ private void routeAck(MessageId messageId,
java.util.function.Consumer<QueueConsumer<T>> action) {
+ if (!(messageId instanceof MessageIdV5 id)) {
+ throw new IllegalArgumentException("Expected MessageIdV5, got: " +
messageId.getClass());
+ }
+ String parent = id.parentTopic();
+ if (parent == null) {
+ throw new IllegalStateException("MessageIdV5 missing parent topic
— was the message"
+ + " delivered through a multi-topic consumer?");
+ }
+ PerTopicState<T> state = perTopic.get(parent);
+ if (state == null) {
+ // Topic was removed between deliver and ack. Fine — broker has
dropped the
+ // session for that topic. Drop the ack silently.
+ log.debug().attr("topic", parent)
+ .log("Ack for removed topic; dropping");
+ return;
+ }
+ action.accept(state.consumer);
+ }
+
+ @Override
+ public AsyncQueueConsumer<T> async() {
+ return asyncView;
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ try {
+ closeAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Close interrupted", e);
+ } catch (ExecutionException e) {
+ throw new PulsarClientException(e.getCause());
+ }
+ }
+
+ @Override
+ public CompletableFuture<Message<T>> receiveAsync() {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return receive();
+ } catch (PulsarClientException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ closed = true;
+ watcher.close();
+ List<CompletableFuture<Void>> closes = new ArrayList<>();
+ for (var topic : new HashSet<>(perTopic.keySet())) {
+ closes.add(closeTopic(topic));
+ }
+ return
CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new));
+ }
+
+ // --- Watcher listener ---
+
+ private final class WatcherListener implements
ScalableTopicsWatcher.Listener {
+ @Override
+ public void onSnapshot(List<String> topics) {
+ // Reconcile: open anything new, close anything missing. Same as
Diff but
+ // computed from the full snapshot — used on reconnect when broker
hash
+ // differs from ours.
+ Set<String> target = new HashSet<>(topics);
+ Set<String> current = new HashSet<>(perTopic.keySet());
+ // Remove first so a rapid remove-then-add of same name
closes-then-reopens.
+ for (String t : current) {
+ if (!target.contains(t)) {
+ closeTopic(t);
+ }
+ }
+ for (String t : target) {
+ if (!current.contains(t)) {
+ openTopic(t, /* retry= */ true);
+ resetBackoff(t);
+ }
+ }
+ }
+
+ @Override
+ public void onDiff(List<String> added, List<String> removed) {
+ // Apply removed before added — covers rapid remove-then-add of
same name.
+ for (String t : removed) {
+ closeTopic(t);
+ }
+ for (String t : added) {
+ openTopic(t, /* retry= */ true);
+ resetBackoff(t);
+ }
+ }
+ }
+
+ // --- Per-topic state ---
+
+ /**
+ * Per-topic bookkeeping. Messages flow directly into the shared mux via
the
+ * sink the wrapper installed on the per-topic consumer at create-time, so
+ * there's no pump thread to start/stop here — just hold a reference to the
+ * underlying consumer for ack routing and clean shutdown.
+ */
+ private static final class PerTopicState<T> {
+ private final String parentTopic;
+ private final ScalableQueueConsumer<T> consumer;
+
+ PerTopicState(String parentTopic, ScalableQueueConsumer<T> consumer) {
+ this.parentTopic = parentTopic;
+ this.consumer = consumer;
+ }
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
new file mode 100644
index 00000000000..e49b755cca4
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Messages;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.StreamConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+/**
+ * Multi-topic {@link StreamConsumer} over the union of scalable topics in a
+ * namespace matching a (possibly empty) set of property filters.
+ *
+ * <p>Cumulative ack across topics works via a per-message position-vector
+ * snapshot: each message that enters the multiplexed queue carries
+ * {@code Map<TopicName, Map<SegmentId, MessageId>>} captured at enqueue time.
+ * On {@code acknowledgeCumulative(msg)}, the wrapper fans out to every
+ * per-topic consumer with the right segment vector — same semantics as the
+ * single-topic case, just lifted one level.
+ *
+ * <p>For Removed-mid-stream topics we flush acks up to {@code latestDelivered}
+ * for that topic before closing the per-topic consumer, so the user's
+ * processing-acked invariant is preserved if the topic is later re-added.
+ */
+final class MultiTopicStreamConsumer<T> implements StreamConsumer<T> {
+
+ private static final Logger LOG =
Logger.get(MultiTopicStreamConsumer.class);
+ /**
+ * Cap for per-topic subscribe retries. Matches v4 consumer reconnect
semantics.
+ */
+ private static final Duration RETRY_MAX = Duration.ofMinutes(30);
+
+ private final Logger log;
+
+ private final PulsarClientV5 client;
+ private final Schema<T> v5Schema;
+ private final ConsumerConfigurationData<T> consumerConf;
+ private final NamespaceName namespace;
+ private final Map<String, String> propertyFilters;
+ private final String subscriptionName;
+
+ private final ScalableTopicsWatcher watcher;
+ private final ConcurrentHashMap<String, PerTopic<T>> perTopic = new
ConcurrentHashMap<>();
+ private final LinkedTransferQueue<MessageV5<T>> mux = new
LinkedTransferQueue<>();
+
+ /**
+ * Tracks the latest delivered message id per (parent topic, segment id)
across
+ * every per-topic consumer. Snapshotted at enqueue time for each delivered
+ * message so cumulative ack covers everything visible up to that message.
+ */
+ private final ConcurrentHashMap<String, ConcurrentHashMap<Long,
org.apache.pulsar.client.api.MessageId>>
+ latestDeliveredPerTopicSegment = new ConcurrentHashMap<>();
+
+ private volatile boolean closed = false;
+ private final AsyncStreamConsumerV5Multi asyncView;
+
+ private MultiTopicStreamConsumer(PulsarClientV5 client,
+ Schema<T> v5Schema,
+ ConsumerConfigurationData<T> consumerConf,
+ NamespaceName namespace,
+ Map<String, String> propertyFilters,
+ ScalableTopicsWatcher watcher) {
+ this.client = client;
+ this.v5Schema = v5Schema;
+ this.consumerConf = consumerConf;
+ this.namespace = namespace;
+ this.propertyFilters = propertyFilters;
+ this.subscriptionName = consumerConf.getSubscriptionName();
+ this.watcher = watcher;
+ this.log = LOG.with()
+ .attr("namespace", namespace)
+ .attr("subscription", subscriptionName)
+ .attr("filters", propertyFilters)
+ .build();
+ this.asyncView = new AsyncStreamConsumerV5Multi();
+ }
+
+ static <T> CompletableFuture<StreamConsumer<T>> createAsync(PulsarClientV5
client,
+ Schema<T>
v5Schema,
+
ConsumerConfigurationData<T> consumerConf,
+ NamespaceName
namespace,
+ Map<String,
String> propertyFilters) {
+ ScalableTopicsWatcher watcher = new ScalableTopicsWatcher(
+ client.v4Client(), namespace, propertyFilters);
+ MultiTopicStreamConsumer<T> consumer = new MultiTopicStreamConsumer<>(
+ client, v5Schema, consumerConf, namespace, propertyFilters,
watcher);
+ return watcher.start()
+ .thenCompose(initial -> consumer.openInitial(initial))
+ .thenApply(__ -> {
+ watcher.setListener(consumer.new WatcherListener());
+ return (StreamConsumer<T>) consumer;
+ })
+ .exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
+ throw ex instanceof CompletionException ce ? ce : new
CompletionException(ex);
+ }));
+ }
+
+ private CompletableFuture<Void> openInitial(List<String> topics) {
+ if (topics.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ List<CompletableFuture<?>> opens = new ArrayList<>(topics.size());
+ for (String t : topics) {
+ opens.add(openTopic(t, /* retry= */ false));
+ }
+ return
CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new));
+ }
+
+ private CompletableFuture<Void> openTopic(String topicName, boolean retry)
{
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (perTopic.containsKey(topicName)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName topic = V5Utils.asScalableTopicName(topicName);
+ // One ScalableConsumerClient session per topic, same as the
single-topic builder.
+ ScalableConsumerClient session = new ScalableConsumerClient(
+ client.v4Client(), topic,
+ consumerConf.getSubscriptionName(),
+ perTopicConsumerName(topicName),
+ ScalableConsumerType.STREAM);
+
+ // Per-topic message sink: each delivered message arrives with its
+ // single-topic positionVector (computed by ScalableStreamConsumer).
Update
+ // our cross-topic latestDelivered map, snapshot the full cross-topic
vector,
+ // and forward to the shared mux. No pump thread.
+ java.util.function.Consumer<MessageV5<T>> sink = msg ->
+ onPerTopicMessage(topicName, msg);
+
+ return session.start()
+ .thenCompose(initialAssignment ->
ScalableStreamConsumer.createAsyncImpl(
+ client, v5Schema, perTopicConf(topicName), session,
+ topicName, initialAssignment, sink))
+ .thenAccept(sc -> {
+ if (closed) {
+ sc.closeAsync();
+ return;
+ }
+ PerTopic<T> state = new PerTopic<>(topicName, sc);
+ PerTopic<T> existing = perTopic.putIfAbsent(topicName,
state);
+ if (existing != null) {
+ sc.closeAsync();
+ return;
+ }
+ log.info().attr("topic", topicName).log("Per-topic stream
consumer attached");
+ })
+ .exceptionally(ex -> {
+ Throwable cause = ex instanceof CompletionException ce &&
ce.getCause() != null
+ ? ce.getCause() : ex;
+ if (retry && !closed) {
+ scheduleRetry(topicName);
+ }
+ log.warn().attr("topic", topicName).exceptionMessage(cause)
+ .log("Per-topic stream subscribe failed");
+ return null;
+ });
+ }
+
+ private void scheduleRetry(String topicName) {
+ long delayMs = nextBackoff(topicName);
+ log.info().attr("topic", topicName).attr("delayMs", delayMs)
+ .log("Retrying per-topic stream subscribe");
+ client.v4Client().timer().newTimeout(timeout -> openTopic(topicName,
/* retry= */ true),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private final ConcurrentHashMap<String, AtomicLong> retryDelays = new
ConcurrentHashMap<>();
+
+ private long nextBackoff(String topicName) {
+ AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new
AtomicLong(100));
+ long current = al.get();
+ long next = Math.min(current * 2, RETRY_MAX.toMillis());
+ al.set(next);
+ return current;
+ }
+
+ private void resetBackoff(String topicName) {
+ retryDelays.remove(topicName);
+ }
+
+ /**
+ * Per-topic consumer name. Each topic gets a distinct name so the
broker's per-topic
+ * coordinator can register them as separate consumers (same
Exclusive-per-segment
+ * semantics, no cross-topic identity coupling).
+ */
+ private String perTopicConsumerName(String topicName) {
+ String localName = TopicName.get(topicName).getLocalName();
+ if (consumerConf.getConsumerName() != null) {
+ return consumerConf.getConsumerName() + "-" + localName;
+ }
+ return "v5-stream-" + V5RandomIds.randomAlphanumeric(8) + "-" +
localName;
+ }
+
+ private ConsumerConfigurationData<T> perTopicConf(String topicName) {
+ var conf = consumerConf.clone();
+ conf.setConsumerName(perTopicConsumerName(topicName));
+ return conf;
+ }
+
+ /**
+ * Close per-topic consumer, flushing pending cumulative acks up to
whatever was
+ * last delivered for that topic. If the topic later re-appears
(re-Added), a
+ * fresh consumer subscribes and resumes from the broker-side cursor —
already
+ * advanced past the messages we've delivered to the user.
+ */
+ private CompletableFuture<Void> closeTopic(String topicName) {
+ retryDelays.remove(topicName);
+ PerTopic<T> state = perTopic.remove(topicName);
+ if (state == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+ // Flush: ack everything we delivered for this topic.
+ ConcurrentHashMap<Long, org.apache.pulsar.client.api.MessageId> latest
=
+ latestDeliveredPerTopicSegment.remove(topicName);
+ if (latest != null && !latest.isEmpty()) {
+ state.consumer.ackUpToVector(new HashMap<>(latest));
+ }
+ return state.consumer.closeAsync()
+ .thenRun(() -> log.info().attr("topic", topicName)
+ .log("Per-topic stream consumer detached"));
+ }
+
+ // --- StreamConsumer ---
+
+ @Override
+ public String topic() {
+ return "namespace://" + namespace;
+ }
+
+ @Override
+ public String subscription() {
+ return subscriptionName;
+ }
+
+ @Override
+ public String consumerName() {
+ return consumerConf.getConsumerName();
+ }
+
+ @Override
+ public Message<T> receive() throws PulsarClientException {
+ try {
+ return mux.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Receive interrupted", e);
+ }
+ }
+
+ @Override
+ public Message<T> receive(Duration timeout) throws PulsarClientException {
+ try {
+ return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Receive interrupted", e);
+ }
+ }
+
+ @Override
+ public Messages<T> receiveMulti(int maxNumMessages, Duration timeout)
throws PulsarClientException {
+ // Block for up to `timeout` waiting for the first message, then drain
whatever
+ // else is immediately available up to maxNumMessages. Same shape as
the single
+ // topic StreamConsumer.
+ long deadline = System.nanoTime() + timeout.toNanos();
+ List<Message<T>> batch = new ArrayList<>();
+ try {
+ long remaining = deadline - System.nanoTime();
+ while (batch.size() < maxNumMessages && remaining > 0) {
+ MessageV5<T> msg = mux.poll(remaining, TimeUnit.NANOSECONDS);
+ if (msg == null) {
+ break;
+ }
+ batch.add(msg);
+ remaining = deadline - System.nanoTime();
+ }
+ // Opportunistic drain of anything else already queued.
+ List<MessageV5<T>> tail = new ArrayList<>();
+ mux.drainTo(tail, maxNumMessages - batch.size());
+ batch.addAll(tail);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Receive interrupted", e);
+ }
+ return new MessagesV5<>(batch);
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId messageId) {
+ fanOutCumulativeAck(messageId, (sc, vector) ->
sc.ackUpToVector(vector));
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId messageId, Transaction txn) {
+ // Transactions on multi-topic are best-effort across per-topic
consumers — each
+ // per-topic ack is independently transactional. See note in the
design doc.
+ fanOutCumulativeAck(messageId, (sc, vector) ->
sc.ackUpToVector(vector));
+ }
+
+ /**
+ * For a cumulative ack on a multi-topic message, look up its multi-topic
vector
+ * and invoke the per-topic ack on every parent topic.
+ */
+ private void fanOutCumulativeAck(MessageId messageId,
+
java.util.function.BiConsumer<ScalableStreamConsumer<T>,
+ Map<Long,
org.apache.pulsar.client.api.MessageId>> action) {
+ if (!(messageId instanceof MessageIdV5 id)) {
+ throw new IllegalArgumentException("Expected MessageIdV5, got: " +
messageId.getClass());
+ }
+ Map<String, Map<Long, org.apache.pulsar.client.api.MessageId>> vector
= id.multiTopicVector();
+ if (vector == null) {
+ throw new IllegalStateException("MessageIdV5 missing multi-topic
vector — was the"
+ + " message delivered through a multi-topic stream
consumer?");
+ }
+ for (var entry : vector.entrySet()) {
+ PerTopic<T> state = perTopic.get(entry.getKey());
+ if (state == null) {
+ // Topic was Removed since enqueue; closeTopic already flushed.
+ continue;
+ }
+ action.accept(state.consumer, entry.getValue());
+ }
+ }
+
+ @Override
+ public AsyncStreamConsumer<T> async() {
+ return asyncView;
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ try {
+ closeAsync().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Close interrupted", e);
+ } catch (ExecutionException e) {
+ throw new PulsarClientException(e.getCause());
+ }
+ }
+
+ CompletableFuture<Void> closeAsync() {
+ if (closed) {
+ return CompletableFuture.completedFuture(null);
+ }
+ closed = true;
+ watcher.close();
+ List<CompletableFuture<Void>> closes = new ArrayList<>();
+ for (var topic : new HashSet<>(perTopic.keySet())) {
+ closes.add(closeTopic(topic));
+ }
+ return
CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new));
+ }
+
+ // --- Watcher listener ---
+
+ private final class WatcherListener implements
ScalableTopicsWatcher.Listener {
+ @Override
+ public void onSnapshot(List<String> topics) {
+ Set<String> target = new HashSet<>(topics);
+ Set<String> current = new HashSet<>(perTopic.keySet());
+ for (String t : current) {
+ if (!target.contains(t)) {
+ closeTopic(t);
+ }
+ }
+ for (String t : target) {
+ if (!current.contains(t)) {
+ openTopic(t, /* retry= */ true);
+ resetBackoff(t);
+ }
+ }
+ }
+
+ @Override
+ public void onDiff(List<String> added, List<String> removed) {
+ for (String t : removed) {
+ closeTopic(t);
+ }
+ for (String t : added) {
+ openTopic(t, /* retry= */ true);
+ resetBackoff(t);
+ }
+ }
+ }
+
+ /**
+ * Per-topic message handler installed as the sink on each per-topic
+ * {@link ScalableStreamConsumer}. The single-topic consumer has already
+ * computed its per-segment position vector and stored it on the inbound
+ * message id; we adopt that as the per-topic slice of our cross-topic
+ * vector, snapshot the full map, and forward into the shared mux.
+ *
+ * <p>Runs on the netty IO thread that delivered the per-segment message —
+ * the only contention is the synchronized snapshot block which guards
+ * against torn cross-topic views during concurrent deliveries.
+ */
+ private void onPerTopicMessage(String parentTopic, MessageV5<T> msg) {
+ if (closed) {
+ return;
+ }
+ MessageIdV5 origId = (MessageIdV5) msg.id();
+
+ // Adopt the message's own positionVector as our per-topic
latest-delivered
+ // slice. ScalableStreamConsumer maintained the increasing invariant on
+ // each segment id; merging via putAll keeps the property cross-topic.
+ ConcurrentHashMap<Long, org.apache.pulsar.client.api.MessageId> ours =
+ latestDeliveredPerTopicSegment.computeIfAbsent(parentTopic,
+ k -> new ConcurrentHashMap<>());
+ ours.putAll(origId.positionVector());
+
+ // Snapshot the cross-topic vector under lock so concurrent deliveries
+ // can't observe a torn view.
+ Map<String, Map<Long, org.apache.pulsar.client.api.MessageId>>
snapshot;
+ synchronized (latestDeliveredPerTopicSegment) {
+ snapshot = new HashMap<>(latestDeliveredPerTopicSegment.size());
+ for (var e : latestDeliveredPerTopicSegment.entrySet()) {
+ snapshot.put(e.getKey(), new HashMap<>(e.getValue()));
+ }
+ }
+
+ MessageIdV5 newId = new MessageIdV5(
+ origId.v4MessageId(), origId.segmentId(),
+ origId.positionVector(), parentTopic, snapshot);
+ mux.add(new MessageV5<>(msg.v4Message(), newId, parentTopic));
+ }
+
+ // --- Per-topic state ---
+
+ /**
+ * Per-topic bookkeeping. Messages flow into the shared mux directly via
the
+ * sink installed on the per-topic consumer at create-time, so there's no
+ * pump thread to manage — this is just a holder for ack routing and clean
+ * shutdown.
+ */
+ private static final class PerTopic<T> {
+ private final String parentTopic;
+ private final ScalableStreamConsumer<T> consumer;
+
+ PerTopic(String parentTopic, ScalableStreamConsumer<T> consumer) {
+ this.parentTopic = parentTopic;
+ this.consumer = consumer;
+ }
+ }
+
+ // --- Async view ---
+
+ private final class AsyncStreamConsumerV5Multi implements
AsyncStreamConsumer<T> {
+ @Override
+ public CompletableFuture<Message<T>> receive() {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return MultiTopicStreamConsumer.this.receive();
+ } catch (PulsarClientException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<Message<T>> receive(Duration timeout) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ return MultiTopicStreamConsumer.this.receive(timeout);
+ } catch (PulsarClientException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ @Override
+ public CompletableFuture<List<Message<T>>> receiveMulti(int
maxNumMessages, Duration timeout) {
+ return CompletableFuture.supplyAsync(() -> {
+ try {
+ Messages<T> ms =
MultiTopicStreamConsumer.this.receiveMulti(maxNumMessages, timeout);
+ List<Message<T>> out = new ArrayList<>();
+ for (Message<T> m : ms) {
+ out.add(m);
+ }
+ return out;
+ } catch (PulsarClientException e) {
+ throw new CompletionException(e);
+ }
+ });
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId messageId) {
+ MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId);
+ }
+
+ @Override
+ public void acknowledgeCumulative(MessageId messageId, Transaction
txn) {
+ MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId,
txn);
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ return MultiTopicStreamConsumer.this.closeAsync();
+ }
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
index a762964c7d7..a268e70219b 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java
@@ -19,11 +19,9 @@
package org.apache.pulsar.client.impl.v5;
import java.time.Duration;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.QueueConsumer;
import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
@@ -34,6 +32,7 @@ import
org.apache.pulsar.client.api.v5.config.ProcessingTimeoutPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
/**
@@ -44,7 +43,11 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
private final PulsarClientV5 client;
private final Schema<T> v5Schema;
private final ConsumerConfigurationData<T> conf = new
ConsumerConfigurationData<>();
+ // Exactly one of {topicName, namespaceName} must be set at subscribe()
time —
+ // single-topic vs multi-topic mode.
private String topicName;
+ private NamespaceName namespaceName;
+ private Map<String, String> propertyFilters;
QueueConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
this.client = client;
@@ -65,48 +68,44 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
@Override
public CompletableFuture<QueueConsumer<T>> subscribeAsync() {
- if (topicName == null || topicName.isEmpty()) {
+ boolean topicSet = topicName != null && !topicName.isEmpty();
+ boolean namespaceSet = namespaceName != null;
+ if (topicSet == namespaceSet) {
return CompletableFuture.failedFuture(
- new
PulsarClientException.InvalidConfigurationException("Topic name is required"));
+ new PulsarClientException.InvalidConfigurationException(
+ "Exactly one of .topic(name) or .namespace(...)
must be set"));
}
if (conf.getSubscriptionName() == null ||
conf.getSubscriptionName().isEmpty()) {
return CompletableFuture.failedFuture(
new
PulsarClientException.InvalidConfigurationException("Subscription name is
required"));
}
+ if (namespaceSet) {
+ return MultiTopicQueueConsumer.createAsync(
+ client, v5Schema, conf, namespaceName, propertyFilters);
+ }
TopicName topic = V5Utils.asScalableTopicName(topicName);
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
-
return dagWatch.start()
.thenCompose(initialLayout ->
ScalableQueueConsumer.createAsync(
client, v5Schema, conf, dagWatch, initialLayout));
}
@Override
- public QueueConsumerBuilderV5<T> topic(String... topicNames) {
- if (topicNames.length > 0) {
- this.topicName = topicNames[0];
- }
+ public QueueConsumerBuilderV5<T> topic(String topicName) {
+ this.topicName = topicName;
return this;
}
@Override
- public QueueConsumerBuilderV5<T> topics(List<String> topicNames) {
- if (!topicNames.isEmpty()) {
- this.topicName = topicNames.get(0);
- }
- return this;
+ public QueueConsumerBuilderV5<T> namespace(String namespace) {
+ return namespace(namespace, Map.of());
}
@Override
- public QueueConsumerBuilderV5<T> topicsPattern(Pattern pattern) {
- conf.setTopicsPattern(pattern);
- return this;
- }
-
- @Override
- public QueueConsumerBuilderV5<T> topicsPattern(String regex) {
- conf.setTopicsPattern(Pattern.compile(regex));
+ public QueueConsumerBuilderV5<T> namespace(String namespace, Map<String,
String> propertyFilters) {
+ this.namespaceName = NamespaceName.get(namespace);
+ this.propertyFilters = propertyFilters == null ? Map.of() :
Map.copyOf(propertyFilters);
return this;
}
@@ -206,12 +205,6 @@ final class QueueConsumerBuilderV5<T> implements
QueueConsumerBuilder<T> {
return this;
}
- @Override
- public QueueConsumerBuilderV5<T> patternAutoDiscoveryPeriod(Duration
interval) {
- conf.setPatternAutoDiscoveryPeriod((int) interval.getSeconds());
- return this;
- }
-
@Override
public QueueConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy policy)
{
conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader()));
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
new file mode 100644
index 00000000000..61536f5b442
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+
+/**
+ * Internal extension of {@link QueueConsumer} that exposes the async hooks
+ * needed by {@link AsyncQueueConsumerV5}. Implemented by both
+ * {@link ScalableQueueConsumer} (single-topic) and {@link
MultiTopicQueueConsumer}
+ * so the async wrapper works against either without duplication.
+ */
+interface QueueConsumerImpl<T> extends QueueConsumer<T> {
+ CompletableFuture<Message<T>> receiveAsync();
+
+ CompletableFuture<Void> closeAsync();
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 61f5e87644d..31cb556b70e 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.common.util.Backoff;
* Individual acknowledgments and negative acknowledgments are routed to
* the correct segment consumer via the segment ID in {@link MessageIdV5}.
*/
-final class ScalableQueueConsumer<T> implements QueueConsumer<T>,
DagWatchClient.LayoutChangeListener {
+final class ScalableQueueConsumer<T> implements QueueConsumerImpl<T>,
DagWatchClient.LayoutChangeListener {
private static final Logger LOG = Logger.get(ScalableQueueConsumer.class);
private final Logger log;
@@ -72,6 +72,13 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
private final ConcurrentHashMap<Long,
CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>>
segmentConsumers = new ConcurrentHashMap<>();
private final LinkedTransferQueue<MessageV5<T>> messageQueue = new
LinkedTransferQueue<>();
+ /**
+ * Where each per-segment receive loop deposits a freshly-arrived message.
Defaults
+ * to enqueueing on {@link #messageQueue} for the user's {@link
#receive()} to pull;
+ * the multi-topic wrapper overrides this to forward directly into its
shared
+ * multiplexed queue, so no per-topic pump thread is needed.
+ */
+ private final java.util.function.Consumer<MessageV5<T>> messageSink;
private volatile boolean closed = false;
private final AsyncQueueConsumerV5<T> asyncView;
@@ -88,7 +95,8 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
private ScalableQueueConsumer(PulsarClientV5 client,
Schema<T> v5Schema,
ConsumerConfigurationData<T> consumerConf,
- DagWatchClient dagWatch) {
+ DagWatchClient dagWatch,
+ java.util.function.Consumer<MessageV5<T>>
messageSink) {
this.client = client;
this.v5Schema = v5Schema;
this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -96,6 +104,10 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
this.dagWatch = dagWatch;
this.topicName = dagWatch.topicName().toString();
this.subscriptionName = consumerConf.getSubscriptionName();
+ // Default sink enqueues on the local messageQueue for
receive()/receive(timeout).
+ // Multi-topic mode passes a sink that forwards into the shared mux
instead — no
+ // per-topic pump thread needed.
+ this.messageSink = messageSink != null ? messageSink :
messageQueue::add;
this.log = LOG.with().attr("topic", topicName).attr("subscription",
subscriptionName).build();
this.asyncView = new AsyncQueueConsumerV5<>(this);
}
@@ -111,11 +123,29 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
ConsumerConfigurationData<T> consumerConf,
DagWatchClient
dagWatch,
ClientSegmentLayout initialLayout) {
- ScalableQueueConsumer<T> consumer = new
ScalableQueueConsumer<>(client, v5Schema, consumerConf, dagWatch);
+ return createAsyncImpl(client, v5Schema, consumerConf, dagWatch,
initialLayout, null)
+ .thenApply(c -> c);
+ }
+
+ /**
+ * Like {@link #createAsync} but resolves to the concrete impl type and
accepts an
+ * optional external message sink. Used by {@link
MultiTopicQueueConsumer}: it
+ * passes a sink that forwards into the shared multiplexed queue, so
per-segment
+ * v4 receive loops deliver messages to the wrapper without any pump
thread.
+ */
+ static <T> CompletableFuture<ScalableQueueConsumer<T>> createAsyncImpl(
+ PulsarClientV5 client,
+ Schema<T> v5Schema,
+ ConsumerConfigurationData<T> consumerConf,
+ DagWatchClient dagWatch,
+ ClientSegmentLayout initialLayout,
+ java.util.function.Consumer<MessageV5<T>> messageSink) {
+ ScalableQueueConsumer<T> consumer = new ScalableQueueConsumer<>(
+ client, v5Schema, consumerConf, dagWatch, messageSink);
return consumer.subscribeSegments(initialLayout)
.thenApply(__ -> {
dagWatch.setListener(consumer);
- return (QueueConsumer<T>) consumer;
+ return consumer;
})
.exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
throw ex instanceof CompletionException ce ? ce : new
CompletionException(ex);
@@ -209,7 +239,8 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
// --- Async internals ---
- CompletableFuture<Message<T>> receiveAsync() {
+ @Override
+ public CompletableFuture<Message<T>> receiveAsync() {
return CompletableFuture.supplyAsync(() -> {
try {
return receive();
@@ -219,7 +250,8 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
});
}
- CompletableFuture<Void> closeAsync() {
+ @Override
+ public CompletableFuture<Void> closeAsync() {
closed = true;
dagWatch.close();
@@ -358,7 +390,7 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
private void startReceiveLoop(org.apache.pulsar.client.api.Consumer<T>
v4Consumer, long segmentId) {
v4Consumer.receiveAsync().thenAccept(v4Msg -> {
- messageQueue.add(new MessageV5<>(v4Msg, segmentId));
+ messageSink.accept(new MessageV5<>(v4Msg, segmentId));
if (!closed) {
startReceiveLoop(v4Consumer, segmentId);
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index ddffea7d84d..afd703ae9a4 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -86,6 +86,13 @@ final class ScalableStreamConsumer<T>
new ConcurrentHashMap<>();
private final LinkedTransferQueue<MessageV5<T>> messageQueue = new
LinkedTransferQueue<>();
+ /**
+ * Where each per-segment receive loop deposits a freshly-arrived message.
Defaults
+ * to enqueueing on {@link #messageQueue} for the user's {@link
#receive()} to pull;
+ * the multi-topic wrapper overrides this to forward into its shared
multiplexed
+ * queue, applying its own multi-topic position-vector capture in the
process.
+ */
+ private final java.util.function.Consumer<MessageV5<T>> messageSink;
private volatile boolean closed = false;
private final AsyncStreamConsumerV5<T> asyncView;
@@ -94,7 +101,8 @@ final class ScalableStreamConsumer<T>
Schema<T> v5Schema,
ConsumerConfigurationData<T> consumerConf,
ScalableConsumerClient session,
- String topicName) {
+ String topicName,
+ java.util.function.Consumer<MessageV5<T>>
messageSink) {
this.client = client;
this.v5Schema = v5Schema;
this.v4Schema = SchemaAdapter.toV4(v5Schema);
@@ -102,6 +110,7 @@ final class ScalableStreamConsumer<T>
this.session = session;
this.topicName = topicName;
this.subscriptionName = consumerConf.getSubscriptionName();
+ this.messageSink = messageSink != null ? messageSink :
messageQueue::add;
this.log = LOG.with().attr("topic", topicName).attr("subscription",
subscriptionName).build();
this.asyncView = new AsyncStreamConsumerV5<>(this);
}
@@ -118,18 +127,54 @@ final class ScalableStreamConsumer<T>
ScalableConsumerClient session,
String
topicName,
List<ActiveSegment> initialAssignment) {
+ return createAsyncImpl(client, v5Schema, consumerConf, session,
topicName, initialAssignment, null)
+ .thenApply(c -> c);
+ }
+
+ /**
+ * Like {@link #createAsync} but resolves to the concrete impl type and
accepts an
+ * optional external message sink. Used by {@link
MultiTopicStreamConsumer}: it
+ * passes a sink that forwards into the shared multiplexed queue,
replacing the
+ * per-topic pump thread with direct delivery.
+ */
+ static <T> CompletableFuture<ScalableStreamConsumer<T>> createAsyncImpl(
+ PulsarClientV5 client,
+ Schema<T> v5Schema,
+ ConsumerConfigurationData<T> consumerConf,
+ ScalableConsumerClient session,
+ String topicName,
+ List<ActiveSegment> initialAssignment,
+ java.util.function.Consumer<MessageV5<T>> messageSink) {
ScalableStreamConsumer<T> consumer = new ScalableStreamConsumer<>(
- client, v5Schema, consumerConf, session, topicName);
+ client, v5Schema, consumerConf, session, topicName,
messageSink);
return consumer.subscribeAssigned(initialAssignment)
.thenApply(__ -> {
session.setListener(consumer);
- return (StreamConsumer<T>) consumer;
+ return consumer;
})
.exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
throw ex instanceof CompletionException ce ? ce : new
CompletionException(ex);
}));
}
+ /**
+ * Multi-topic ack hook. Synthesises a {@link MessageIdV5} carrying the
supplied
+ * vector and routes it through the regular cumulative-ack path so
segments are
+ * acked up to the recorded positions. Used by {@link
MultiTopicStreamConsumer}
+ * to fan out a cumulative ack across every per-topic consumer.
+ */
+ void ackUpToVector(java.util.Map<Long,
org.apache.pulsar.client.api.MessageId> vector) {
+ if (vector == null || vector.isEmpty()) {
+ return;
+ }
+ // The constructed id only needs the positionVector; v4MessageId /
segmentId are
+ // unused on the cumulative-ack path. Pick any value for the
non-vector slots —
+ // earliest is convenient and won't accidentally satisfy a peer's
check.
+ var synthetic = new
MessageIdV5(org.apache.pulsar.client.api.MessageId.earliest,
+ MessageIdV5.NO_SEGMENT, vector);
+ acknowledgeCumulative(synthetic);
+ }
+
@Override
public String topic() {
return topicName;
@@ -358,7 +403,7 @@ final class ScalableStreamConsumer<T>
// Create the V5 message with the position vector embedded in the
ID
var msgId = new MessageIdV5(v4Msg.getMessageId(), segmentId,
positionVector);
- messageQueue.add(new MessageV5<>(v4Msg, msgId));
+ messageSink.accept(new MessageV5<>(v4Msg, msgId));
if (!closed) {
startReceiveLoop(v4Consumer, segmentId);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d3c356ecb55..6b9f6b67e07 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -42,7 +42,11 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
private final PulsarClientV5 client;
private final Schema<T> v5Schema;
private final ConsumerConfigurationData<T> conf = new
ConsumerConfigurationData<>();
+ // Exactly one of {topicName, namespaceName} must be set at subscribe()
time —
+ // single-topic vs multi-topic mode.
private String topicName;
+ private org.apache.pulsar.common.naming.NamespaceName namespaceName;
+ private Map<String, String> propertyFilters;
StreamConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
this.client = client;
@@ -63,21 +67,29 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
@Override
public CompletableFuture<StreamConsumer<T>> subscribeAsync() {
- if (topicName == null || topicName.isEmpty()) {
+ boolean topicSet = topicName != null && !topicName.isEmpty();
+ boolean namespaceSet = namespaceName != null;
+ if (topicSet == namespaceSet) {
return CompletableFuture.failedFuture(
- new
PulsarClientException.InvalidConfigurationException("Topic name is required"));
+ new PulsarClientException.InvalidConfigurationException(
+ "Exactly one of .topic(name) or .namespace(...)
must be set"));
}
if (conf.getSubscriptionName() == null ||
conf.getSubscriptionName().isEmpty()) {
return CompletableFuture.failedFuture(
new
PulsarClientException.InvalidConfigurationException("Subscription name is
required"));
}
-
- TopicName topic = V5Utils.asScalableTopicName(topicName);
// Default the consumer name to a stable random when the user didn't
set one —
// ScalableConsumerClient uses it as the registration key with the
controller.
if (conf.getConsumerName() == null ||
conf.getConsumerName().isEmpty()) {
conf.setConsumerName("v5-stream-" +
V5RandomIds.randomAlphanumeric(8));
}
+
+ if (namespaceSet) {
+ return MultiTopicStreamConsumer.createAsync(
+ client, v5Schema, conf, namespaceName, propertyFilters);
+ }
+
+ TopicName topic = V5Utils.asScalableTopicName(topicName);
ScalableConsumerClient session = new ScalableConsumerClient(
client.v4Client(),
topic,
@@ -91,10 +103,20 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
}
@Override
- public StreamConsumerBuilderV5<T> topic(String... topicNames) {
- if (topicNames.length > 0) {
- this.topicName = topicNames[0];
- }
+ public StreamConsumerBuilderV5<T> topic(String topicName) {
+ this.topicName = topicName;
+ return this;
+ }
+
+ @Override
+ public StreamConsumerBuilderV5<T> namespace(String namespace) {
+ return namespace(namespace, Map.of());
+ }
+
+ @Override
+ public StreamConsumerBuilderV5<T> namespace(String namespace, Map<String,
String> propertyFilters) {
+ this.namespaceName =
org.apache.pulsar.common.naming.NamespaceName.get(namespace);
+ this.propertyFilters = propertyFilters == null ? Map.of() :
Map.copyOf(propertyFilters);
return this;
}
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
index 577023070ad..ebdbf73c0b4 100644
---
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java
@@ -168,6 +168,64 @@ public class MessageIdV5Test {
assertThrows(java.io.IOException.class, () ->
MessageIdV5.fromByteArray(null));
}
+ @Test
+ public void testRoundtripWithParentTopic() throws Exception {
+ // Multi-topic queue consumer ids carry a parent-topic tag for ack
routing
+ // but no cross-topic vector. Both must survive serialisation.
+ MessageIdV5 original = new MessageIdV5(v4(11, 22, 0), 3L,
+ Map.of(0L, v4(1, 2, 0)),
+ "topic://tenant/ns/my-topic");
+
+ MessageIdV5 decoded =
MessageIdV5.fromByteArray(original.toByteArray());
+
+ assertEquals(decoded.segmentId(), 3L);
+ assertEquals(decoded.parentTopic(), "topic://tenant/ns/my-topic");
+ assertEquals(decoded.positionVector().size(), 1);
+ // No multi-topic vector was set; round-trip must preserve null.
+ assertEquals(decoded.multiTopicVector(), null);
+ }
+
+ @Test
+ public void testRoundtripWithMultiTopicVector() throws Exception {
+ // Multi-topic stream consumer ids carry a cross-topic position vector
and
+ // the parent topic. Whole tree must survive byte-level round trip so
an
+ // application that serialises the id and sends it through can still
call
+ // acknowledgeCumulative against the right per-topic / per-segment
positions.
+ Map<String, Map<Long, MessageId>> multi = Map.of(
+ "topic://tenant/ns/a", Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2,
0)),
+ "topic://tenant/ns/b", Map.of(0L, v4(3, 3, 0)));
+ MessageIdV5 original = new MessageIdV5(v4(99, 100, 0), 5L,
+ Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)),
+ "topic://tenant/ns/a",
+ multi);
+
+ MessageIdV5 decoded =
MessageIdV5.fromByteArray(original.toByteArray());
+
+ assertEquals(decoded.segmentId(), 5L);
+ assertEquals(decoded.parentTopic(), "topic://tenant/ns/a");
+ assertEquals(decoded.positionVector().size(), 2);
+ assertNotNull(decoded.multiTopicVector());
+ assertEquals(decoded.multiTopicVector().keySet(),
+ java.util.Set.of("topic://tenant/ns/a",
"topic://tenant/ns/b"));
+ assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/a"),
+ Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)));
+ assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/b"),
+ Map.of(0L, v4(3, 3, 0)));
+ }
+
+ @Test
+ public void testRoundtripPreservesNullMultiTopicVector() throws Exception {
+ // Single-topic ids leave both new fields null; we must not
accidentally
+ // hydrate them on decode.
+ MessageIdV5 original = new MessageIdV5(v4(7, 8, 0), 1L,
+ Map.of(0L, v4(1, 2, 0)));
+
+ MessageIdV5 decoded =
MessageIdV5.fromByteArray(original.toByteArray());
+
+ assertEquals(decoded.parentTopic(), null);
+ assertEquals(decoded.multiTopicVector(), null);
+ }
+
@Test
public void testFromByteArrayRejectsTooShort() {
assertThrows(java.io.IOException.class, () ->
MessageIdV5.fromByteArray(new byte[3]));