+1
On 2026/06/04 23:36:38 Matteo Merli wrote: > https://github.com/apache/pulsar/pull/25938 > > > ------ > > > # PIP-483: Scalable Topic Auto Split/Merge > > *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)* > > ## Motivation > > [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability > to **split** and **merge** segments, but only on explicit operator > request via the admin API. An operator has to watch the topic, decide > it is hot, and issue a split — and later notice it has gone cold and > issue a merge. This is the same operational toil that partition-count > management imposes on classic Pulsar topics, which scalable topics > were meant to eliminate. > > This PIP adds an **auto-scaling policy** to the controller: the > controller leader observes per-segment load and per-subscription > consumer pressure, and autonomously splits hot segments and merges > cold ones, within hard caps that prevent runaway growth and > split/merge flip-flopping. > > The design is built around three principles that came out of the > design discussion: > > 1. **Splits are fast; merges are lazy.** A split protects throughput > and latency under load, so it fires quickly — with only a short > cooldown to coalesce bursts of near-simultaneous triggers (e.g. a > group of consumers connecting in rapid succession). A merge is purely > an efficiency reclaim, so it can wait, be rate-limited, and be skipped > when in doubt. > 2. **The controller reacts, it does not poll.** New stream/checkpoint > consumers register directly with the controller, so consumer-count > changes are handled event-driven within seconds. Load data is *pushed* > into the metadata store by each segment's owning broker (only when it > changes materially) and read by the controller leader. The controller > never fans out RPCs to segment owners. > 3. **The decision is a pure function.** Given a snapshot of load + > layout + policy, the split/merge decision is deterministic and > unit-testable in isolation from all I/O. > > --- > > ## Goals > > - Automatically increase segment count when a topic is under > ingest/dispatch load or has more stream/checkpoint consumers than > segments. > - Automatically decrease segment count when load subsides, reclaiming > broker resources. > - Bound growth (`maxSegments`) and bound split↔merge churn > (`maxDagDepth`, asymmetric cooldown). > - Default-on cluster-wide, with per-namespace and per-topic overrides, > following Pulsar's existing policy-resolution conventions. > > ### Non-goals > > - **Broker placement / rebalancing.** Which broker owns a segment's > bundle is the load balancer's job; this PIP only changes *how many* > segments exist. > - **Key-aware or non-midpoint splits.** Splits use the existing > midpoint-split mechanism from PIP-468. > - **Cross-topic global optimization.** Each topic's controller decides > independently. > > --- > > ## Design > > ### Overview > > ``` > ┌────────────────────────────────────────────────────────────────┐ > │ Segment-owning broker (per active segment) │ > │ SegmentLoadReporter │ > │ - samples the segment topic's TopicStats │ > │ - writes SegmentLoadStats to metadata ONLY on material change │ > └───────────────────────────────┬────────────────────────────────┘ > │ (metadata store, push-on-change) > ▼ > ┌────────────────────────────────────────────────────────────────┐ > │ Controller leader (per scalable topic) │ > │ │ > │ Event-driven — within seconds: │ > │ on STREAM/CHECKPOINT consumer register/unregister │ > │ (consumers already register with the controller — no poll) │ > │ → evaluate the consumer-count split rule immediately │ > │ │ > │ Periodic AutoScaleTick — traffic, default 60s: │ > │ 1. read SegmentLoadStats for all active segments │ > │ 2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None │ > │ 3. dispatch to existing splitSegment / mergeSegments │ > └────────────────────────────────────────────────────────────────┘ > ``` > > The two trigger sources reflect their different latency needs: a new > consumer should get its own segment **within seconds**, so it is > handled the instant the consumer registers with the controller; > traffic shifts up or down over **a minute or more**, so they are > evaluated on a slower periodic tick. The only new persistent state is > `SegmentLoadStats`. The split/merge *mechanics* are entirely reused > from PIP-468. > > ### Load reporting: push-to-metadata, not pull-per-tick > > Each segment's owning broker runs a **`SegmentLoadReporter`** for > every ACTIVE `segment://` topic it hosts. The broker writes > `SegmentLoadStats` **directly to the metadata store** — it already has > the rates in memory, so there is no REST round-trip and no > controller-initiated pull. On a fixed sampling interval it compares > the current rates to the last ones written and writes **only when a > rate changes by more than a significant threshold** (default ±25%) > since the last write. A steady-state segment writes once and then goes > silent, keeping metadata write volume bounded regardless of traffic. > > #### `SegmentLoadStats` (new metadata record) > > Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`: > > ```json > { > "msgRateIn": 12000.0, > "bytesRateIn": 64000000.0, > "msgRateOut": 48000.0, > "bytesRateOut": 256000000.0 > } > ``` > > | Field | Source on the owning broker | Meaning for auto split/merge | > |-------|------------------------------|---------------------------| > | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s > rolling) | ingest load | > | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` | > dispatch/fanout load (high for topics with many subscriptions) | > > The record carries no timestamp of its own: the metadata store's > `Stat` for the znode already exposes creation and last-modified > timestamps, and the controller uses the **modified timestamp** for > windowing. A record that still reads "cold" with an old modified time > proves the segment has been cold for `now − modifiedAt` — so > split/merge **windows derive from the store's `Stat`** with no > per-tick history buffer and no extra field. > > #### Significant-change threshold > > To avoid rewriting on every minor wobble, the reporter only writes > when a rate has moved by more than > `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to > the last value written for that segment. Sampling cadence is > `scalableTopicLoadReportInterval` (default 10s). Both are configurable > via `broker.conf`. > > ### Subscription types and what each load type drives > > Recall from PIP-468 that scalable-topic subscriptions are `STREAM` > (controller-managed, 1:1 segment↔consumer assignment; covers both > StreamConsumer and CheckpointConsumer) or `QUEUE` > (controller-bypassing; every consumer attaches to every segment and > the broker round-robins). > > | Trigger | STREAM subscriptions | QUEUE subscriptions | > |---------|----------------------|----------------------| > | Consumer-count scale-up | **Yes** — more segments give more 1:1 > parallelism | **No** — queue consumers share segments; more segments > don't add parallelism for them | > | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still > loads the segment's broker and counts toward the per-segment rate | > > So a topic with only QUEUE subscriptions never splits on consumer > count, but still splits when any segment's in/out rate crosses > threshold. > > ### The decision: `AutoScalePolicyEvaluator` > > A pure function with no I/O: > > ``` > decide(layout, loadBySegment, streamConsumerCountBySub, policy, now) > → Split(segmentId) | Merge(segmentId1, segmentId2) | None > ``` > > It runs in two passes — **split first (short cooldown), then merge > (long cooldown)** — and emits at most one action per invocation. > > #### Pass 1 — SPLIT (fast, lightly coalesced) > > Splits fire as soon as conditions are met, bounded by `maxSegments`, > an in-flight-operation guard, and a **short `splitCooldown` (default 1 > min)**. The cooldown is deliberately short: it exists only to coalesce > a burst of near-simultaneous triggers — e.g. a group of consumers > connecting in rapid succession should cause one split, not N — while > still letting a genuinely growing topic split again on the next > minute. > > ``` > if activeSegments >= maxSegments: skip split pass > if now - lastSplitAtMs < splitCooldown: skip split pass > > (a) Consumer-driven: > required = max over STREAM subscriptions of consumerCount // > per-subscription max > if required > activeSegments: > → Split(busiest active segment by msgRateIn) > > (b) Load-driven (if (a) didn't fire): > candidate segments = active segments where ANY of: > - msgRateIn > splitMsgRateInThreshold > - bytesRateIn > splitBytesRateInThreshold > - msgRateOut > splitMsgRateOutThreshold > - bytesRateOut> splitBytesRateOutThreshold > if candidates non-empty: > → Split(most-overloaded candidate); set lastSplitAtMs = now > ``` > > The consumer-driven rule (a) is what the **event-driven path** > evaluates the moment a consumer registers, so a new consumer gets a > segment within seconds (subject to `splitCooldown`). The load-driven > rule (b) runs on the periodic tick. Because `msgRateIn` etc. are > already 60-second rolling averages on the broker, a value over > threshold already represents *sustained* load — no extra split window > is needed to filter transient spikes. > > #### Pass 2 — MERGE (lazy, rate-limited) > > Merges run only if no split fired this tick, the topic is not within > `mergeCooldown` of its last merge, and the result respects > `maxDagDepth`. > > ``` > if a split fired this tick: skip merge pass > if now - lastMergeAtMs < mergeCooldown: skip merge pass > if activeSegments <= minSegments: skip merge pass > > candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy, > for at least mergeWindow (checked via the store > Stat's modified time): > - msgRateIn < mergeMsgRateInThreshold > - bytesRateIn < mergeBytesRateInThreshold > - msgRateOut < mergeMsgRateOutThreshold > - bytesRateOut< mergeBytesRateOutThreshold > AND neither segment's lineage is already at maxDagDepth merges > > if candidate pairs non-empty: > → Merge(coldest pair by combined rate); set lastMergeAtMs = now > ``` > > Adjacency is required because the existing `mergeSegments` API only > merges hash-range-adjacent active segments. > > ### Anti-flip-flop: three independent guards > > 1. **Threshold gap (hysteresis).** Split thresholds are well above > merge thresholds for every metric. The dead-band between them is what > prevents a just-merged segment from immediately re-qualifying for a > split. > 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1 > min) that only coalesces bursts. Merges: a longer `mergeCooldown` > (default 5 min) plus a `mergeWindow` (default 5 min) during which the > segment must have stayed cold. A pair must be *durably* cold to merge, > but a segment can split again within a minute of getting hot. > 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how > many merges a given lineage can accumulate. Once reached, that lineage > stops being a merge candidate — **but load-driven splits still fire.** > This bounds the number of split↔merge cycles a hash range can churn > through while never blocking a split that throughput requires. > > > **Design note — direction of the depth cap.** The cap restricts *merges*, > > not splits. Splits are needed for correctness/performance and must always > > be available; merges are the optional efficiency step and are the ones > > that, combined with splits, could oscillate. `dagDepth` therefore counts > > **merges in a segment's lineage**, derived from the existing > > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not > > consume depth budget. > > ### Caps > > | Cap | Default | Effect | > |-----|---------|--------| > | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. | > | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. | > | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits > unaffected. | > > ### Manual operations and cooldown > > - Manual `admin.scalableTopics().splitSegment(...)` **sets > `lastSplitAtMs`**, so a manual split also starts the short auto-split > cooldown. > - Manual `admin.scalableTopics().mergeSegments(...)` **sets > `lastMergeAtMs`**, so the operator's manual efficiency action also > rate-limits the controller's automatic merges. > > ### Evaluation triggers > > The controller leader evaluates auto split/merge from two sources: > > - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT > consumer registers with or unregisters from the controller, it > immediately evaluates the consumer-count split rule. No polling: > consumer registration already flows through the controller (PIP-468). > - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC > tick from PIP-468), default cadence `scalableTopicAutoScaleInterval = > 60s`, evaluates the traffic-driven split rules and the merge pass. Per > tick it does one metadata batch-read of the topic's `segments/*/load` > records (or maintains a watch cache), evaluates, and dispatches. > > Both sources call the same `AutoScalePolicyEvaluator`; the > event-driven path only needs the consumer-count rule, so it is cheap. > Both are cancelled on leadership loss / close. > > --- > > ## Public-Facing Changes > > ### Configuration (`broker.conf`) > > Auto-scaling is **default-on cluster-wide**; these are the defaults > applied to every scalable topic that does not override them. > > | Property | Default | Description | > |----------|---------|-------------| > | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto > split/merge. | > | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic) > evaluation cadence. Consumer-count changes are handled event-driven, > independent of this. | > | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. | > | `scalableTopicMinSegments` | `1` | Hard floor on active segments. | > | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before > merges are disabled for it. | > | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic > splits on a topic (coalesces bursts). | > | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic > merges on a topic. | > | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold > before merging. | > | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest > split trigger. | > | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest > split trigger. | > | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch > split trigger. | > | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s > dispatch split trigger. | > | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge > trigger. | > | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest > merge trigger. | > | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch > merge trigger. | > | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s > dispatch merge trigger. | > | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling > interval. | > | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate > change since the last write that triggers a new `SegmentLoadStats` > write. | > > ### Policy resolution (namespace + topic overrides) > > Following the existing `autoTopicCreationOverride` pattern, an > `AutoScalePolicyOverride` can be set at two levels; resolution is > most-specific-wins, falling back to `broker.conf`: > > 1. **Per-topic** — a new `autoScalePolicy` field on > `ScalableTopicMetadata`, set via > `admin.scalableTopics().setAutoScalePolicy(topic, policy)` / > `getAutoScalePolicy(topic)`. > 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on > `Policies`, set via > `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`. > > `AutoScalePolicyOverride` carries the same knobs as the broker config > (all optional; unset fields fall through). Setting `enabled = false` > opts a topic or namespace out entirely. > > ### Admin Client API > > ```java > interface ScalableTopics { > // ... existing ... > void setAutoScalePolicy(String topic, AutoScalePolicyOverride > policy) throws PulsarAdminException; > AutoScalePolicyOverride getAutoScalePolicy(String topic) throws > PulsarAdminException; > void removeAutoScalePolicy(String topic) throws PulsarAdminException; > } > > interface Namespaces { > // ... existing ... > void setScalableTopicAutoScalePolicy(String namespace, > AutoScalePolicyOverride policy) ...; > AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String > namespace) ...; > void removeScalableTopicAutoScalePolicy(String namespace) ...; > } > ``` > > ### Metadata Store Paths > > | Path | Content | Writer | > |------|---------|--------| > | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` | > `SegmentLoadStats` JSON | segment-owning broker | > > (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata` > blob; the namespace override rides inside `Policies`. No other new > paths.) > > ### Observability > > - New per-topic metrics: `pulsar_scalable_topic_active_segments`, > `pulsar_scalable_topic_auto_splits_total`, > `pulsar_scalable_topic_auto_merges_total`, > `pulsar_scalable_topic_split_suppressed_max_segments_total`, > `pulsar_scalable_topic_merge_suppressed_max_depth_total`. > - The existing `ScalableTopicStats` is extended with the most recent > `SegmentLoadStats` per segment and the resolved effective policy, so > operators can see *why* the controller did or did not act. > > --- > > ## Operational Safety > > The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and > threshold-gap guards together bound both the rate and the total amount > of structural change a topic can undergo, so enabling auto split/merge > cannot cause unbounded segment growth or split/merge storms. > > Operators who want manual-only control set > `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false` > override (namespace/topic). > > > **Compatibility:** scalable topics are a new, as-yet-unreleased feature > > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to > > consider — `SegmentLoadStats`, the policy fields, and the config knobs all > > ship together with the rest of the scalable-topic feature. > > --- > > ## Security Considerations > > `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace > variants) require the same admin permissions as the corresponding > existing scalable-topic and namespace policy operations. > `SegmentLoadStats` is written by brokers via their authenticated > internal identity and is not client-writable. > > --- > > ## Links > > - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md) > - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md) > - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md) > > -- > Matteo Merli > <[email protected]> >
