merlimat opened a new pull request, #25651:
URL: https://github.com/apache/pulsar/pull/25651

   ## Summary
   
   User-facing follow-up to the namespace scalable-topics watcher (#25648):
   subscribe to the union of scalable topics in a namespace that match a
   (possibly empty) set of property filters, follow the matching set live,
   multiplex from every per-topic consumer into one user-visible queue.
   
   ### Builder API
   
   `QueueConsumerBuilder` / `StreamConsumerBuilder` gain:
   
   ```java
   .namespace(String namespace)
   .namespace(String namespace, Map<String, String> propertyFilters)
   ```
   
   Mutually exclusive with `.topic(name)` — `subscribe()` rejects either-both
   or neither.
   
   While there, dropped the legacy multi-topic / pattern surface from v5 (it
   was never wired through to scalable topics): `topics(List)`,
   `topicsPattern(Pattern)`, `topicsPattern(String)`, 
`patternAutoDiscoveryPeriod`.
   Tightened `topic(String...)` → `topic(String)` (single topic).
   
   ### MultiTopicQueueConsumer
   
   - Wraps a `ScalableTopicsWatcher`; on Snapshot / Diff opens / closes
     per-topic `ScalableQueueConsumer`s.
   - Per-segment v4 receive loops in each per-topic consumer dispatch
     through an injectable message sink. The wrapper installs a sink that
     tags each `MessageV5` with the parent scalable topic via
     `withTopicOverride` and forwards into the shared mux queue. **No pump
     thread per topic** — same chained-async pattern as v4
     `MultiTopicsConsumerImpl`.
   - `MessageIdV5` carries a `parentTopic` field so `acknowledge(msgId)`
     finds the right per-topic consumer.
   - `msg.topic()` surfaces the parent scalable topic the user subscribed
     to (not the internal segment topic).
   - Per-topic subscribe failure: retry forever with exp backoff (100 ms
     initial, 30 min cap).
   
   ### MultiTopicStreamConsumer
   
   - Same wrapper shape, with a per-message **cross-topic position vector**:
     `Map<TopicName, Map<SegmentId, MessageId>>` captured at enqueue time.
   - `acknowledgeCumulative(msg)` fans out to every per-topic consumer with
     that topic's segment vector — same semantics as the single-topic
     cumulative ack, lifted one level.
   - `ScalableStreamConsumer.ackUpToVector(Map<SegmentId, MessageId>)`
     exposed as the multi-topic ack hook.
   - Topic Removed mid-stream: flush acks up to `latestDelivered` for the
     topic before closing the per-topic consumer.
   
   ### Supporting changes
   
   - `MessageV5.topicOverride` / `withTopicOverride` for parent-topic
     display; `v4Message()` accessor for the wrapper to rebuild messages
     with augmented ids.
   - `MessageIdV5` gets `parentTopic` + `multiTopicVector` fields, plus a
     5-arg full constructor used by the multi-topic stream sink. Wire format
     is length-prefixed by section so older serialised forms (without the
     new fields) still decode and so the new fields round-trip cleanly.
   - `ScalableQueueConsumer` / `ScalableStreamConsumer` accept an optional
     external `Consumer<MessageV5<T>>` sink. Default behaviour is
     unchanged (enqueue on the local `messageQueue`); multi-topic mode
     passes a sink that forwards into the shared mux.
   - New package-private `QueueConsumerImpl` interface so
     `AsyncQueueConsumerV5` is shared between `ScalableQueueConsumer` and
     `MultiTopicQueueConsumer`.
   - `createAsyncImpl` variants on both per-topic consumers return the
     concrete impl type and accept the optional sink — used by the
     multi-topic wrappers that hold typed references.
   
   ## Test plan
   
   - `MessageIdV5Test` adds three round-trip cases: parent-topic only,
     parent-topic + cross-topic vector, and the null-fields path (proves
     the new sections don't accidentally hydrate for single-topic ids).
   - `V5MultiTopicQueueConsumerTest` (3 cases): receives from every topic
     in the namespace; picks up a topic created **after** subscribe via
     the watcher's Diff event; property filter narrows the set so only
     matching topics deliver messages.
   - `V5MultiTopicStreamConsumerTest` (4 cases): same three plus
     `cumulativeAckCoversEveryTopicSeenSoFar` — drains both topics, calls
     `acknowledgeCumulative` on the last message, re-subscribes with the
     same name, asserts no redelivery (proves the cross-topic position
     vector reaches every per-topic consumer).
   - All v5 single-topic regression tests still pass.
   
   ### Matching PR(s) in forked repositories
   
   - area/client


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to