merlimat opened a new pull request, #25651:
URL: https://github.com/apache/pulsar/pull/25651
## Summary
User-facing follow-up to the namespace scalable-topics watcher (#25648):
subscribe to the union of scalable topics in a namespace that match a
(possibly empty) set of property filters, follow the matching set live,
multiplex from every per-topic consumer into one user-visible queue.
### Builder API
`QueueConsumerBuilder` / `StreamConsumerBuilder` gain:
```java
.namespace(String namespace)
.namespace(String namespace, Map<String, String> propertyFilters)
```
Mutually exclusive with `.topic(name)` — `subscribe()` rejects either-both
or neither.
While there, dropped the legacy multi-topic / pattern surface from v5 (it
was never wired through to scalable topics): `topics(List)`,
`topicsPattern(Pattern)`, `topicsPattern(String)`,
`patternAutoDiscoveryPeriod`.
Tightened `topic(String...)` → `topic(String)` (single topic).
### MultiTopicQueueConsumer
- Wraps a `ScalableTopicsWatcher`; on Snapshot / Diff opens / closes
per-topic `ScalableQueueConsumer`s.
- Per-segment v4 receive loops in each per-topic consumer dispatch
through an injectable message sink. The wrapper installs a sink that
tags each `MessageV5` with the parent scalable topic via
`withTopicOverride` and forwards into the shared mux queue. **No pump
thread per topic** — same chained-async pattern as v4
`MultiTopicsConsumerImpl`.
- `MessageIdV5` carries a `parentTopic` field so `acknowledge(msgId)`
finds the right per-topic consumer.
- `msg.topic()` surfaces the parent scalable topic the user subscribed
to (not the internal segment topic).
- Per-topic subscribe failure: retry forever with exp backoff (100 ms
initial, 30 min cap).
### MultiTopicStreamConsumer
- Same wrapper shape, with a per-message **cross-topic position vector**:
`Map<TopicName, Map<SegmentId, MessageId>>` captured at enqueue time.
- `acknowledgeCumulative(msg)` fans out to every per-topic consumer with
that topic's segment vector — same semantics as the single-topic
cumulative ack, lifted one level.
- `ScalableStreamConsumer.ackUpToVector(Map<SegmentId, MessageId>)`
exposed as the multi-topic ack hook.
- Topic Removed mid-stream: flush acks up to `latestDelivered` for the
topic before closing the per-topic consumer.
### Supporting changes
- `MessageV5.topicOverride` / `withTopicOverride` for parent-topic
display; `v4Message()` accessor for the wrapper to rebuild messages
with augmented ids.
- `MessageIdV5` gets `parentTopic` + `multiTopicVector` fields, plus a
5-arg full constructor used by the multi-topic stream sink. Wire format
is length-prefixed by section so older serialised forms (without the
new fields) still decode and so the new fields round-trip cleanly.
- `ScalableQueueConsumer` / `ScalableStreamConsumer` accept an optional
external `Consumer<MessageV5<T>>` sink. Default behaviour is
unchanged (enqueue on the local `messageQueue`); multi-topic mode
passes a sink that forwards into the shared mux.
- New package-private `QueueConsumerImpl` interface so
`AsyncQueueConsumerV5` is shared between `ScalableQueueConsumer` and
`MultiTopicQueueConsumer`.
- `createAsyncImpl` variants on both per-topic consumers return the
concrete impl type and accept the optional sink — used by the
multi-topic wrappers that hold typed references.
## Test plan
- `MessageIdV5Test` adds three round-trip cases: parent-topic only,
parent-topic + cross-topic vector, and the null-fields path (proves
the new sections don't accidentally hydrate for single-topic ids).
- `V5MultiTopicQueueConsumerTest` (3 cases): receives from every topic
in the namespace; picks up a topic created **after** subscribe via
the watcher's Diff event; property filter narrows the set so only
matching topics deliver messages.
- `V5MultiTopicStreamConsumerTest` (4 cases): same three plus
`cumulativeAckCoversEveryTopicSeenSoFar` — drains both topics, calls
`acknowledgeCumulative` on the last message, re-subscribes with the
same name, asserts no redelivery (proves the cross-topic position
vector reaches every per-topic consumer).
- All v5 single-topic regression tests still pass.
### Matching PR(s) in forked repositories
- area/client
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]