+1

On 2026/06/04 23:36:38 Matteo Merli wrote:
> https://github.com/apache/pulsar/pull/25938
> 
> 
> ------
> 
> 
> # PIP-483: Scalable Topic Auto Split/Merge
> 
> *Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*
> 
> ## Motivation
> 
> [PIP-468](pip-468.md) gives the Scalable Topic Controller the ability
> to **split** and **merge** segments, but only on explicit operator
> request via the admin API. An operator has to watch the topic, decide
> it is hot, and issue a split — and later notice it has gone cold and
> issue a merge. This is the same operational toil that partition-count
> management imposes on classic Pulsar topics, which scalable topics
> were meant to eliminate.
> 
> This PIP adds an **auto-scaling policy** to the controller: the
> controller leader observes per-segment load and per-subscription
> consumer pressure, and autonomously splits hot segments and merges
> cold ones, within hard caps that prevent runaway growth and
> split/merge flip-flopping.
> 
> The design is built around three principles that came out of the
> design discussion:
> 
> 1. **Splits are fast; merges are lazy.** A split protects throughput
> and latency under load, so it fires quickly — with only a short
> cooldown to coalesce bursts of near-simultaneous triggers (e.g. a
> group of consumers connecting in rapid succession). A merge is purely
> an efficiency reclaim, so it can wait, be rate-limited, and be skipped
> when in doubt.
> 2. **The controller reacts, it does not poll.** New stream/checkpoint
> consumers register directly with the controller, so consumer-count
> changes are handled event-driven within seconds. Load data is *pushed*
> into the metadata store by each segment's owning broker (only when it
> changes materially) and read by the controller leader. The controller
> never fans out RPCs to segment owners.
> 3. **The decision is a pure function.** Given a snapshot of load +
> layout + policy, the split/merge decision is deterministic and
> unit-testable in isolation from all I/O.
> 
> ---
> 
> ## Goals
> 
> - Automatically increase segment count when a topic is under
> ingest/dispatch load or has more stream/checkpoint consumers than
> segments.
> - Automatically decrease segment count when load subsides, reclaiming
> broker resources.
> - Bound growth (`maxSegments`) and bound split↔merge churn
> (`maxDagDepth`, asymmetric cooldown).
> - Default-on cluster-wide, with per-namespace and per-topic overrides,
> following Pulsar's existing policy-resolution conventions.
> 
> ### Non-goals
> 
> - **Broker placement / rebalancing.** Which broker owns a segment's
> bundle is the load balancer's job; this PIP only changes *how many*
> segments exist.
> - **Key-aware or non-midpoint splits.** Splits use the existing
> midpoint-split mechanism from PIP-468.
> - **Cross-topic global optimization.** Each topic's controller decides
> independently.
> 
> ---
> 
> ## Design
> 
> ### Overview
> 
> ```
> ┌────────────────────────────────────────────────────────────────┐
> │ Segment-owning broker (per active segment)                      │
> │   SegmentLoadReporter                                            │
> │   - samples the segment topic's TopicStats                      │
> │   - writes SegmentLoadStats to metadata ONLY on material change │
> └───────────────────────────────┬────────────────────────────────┘
>                                  │ (metadata store, push-on-change)
>                                  ▼
> ┌────────────────────────────────────────────────────────────────┐
> │ Controller leader (per scalable topic)                          │
> │                                                                  │
> │  Event-driven — within seconds:                                 │
> │    on STREAM/CHECKPOINT consumer register/unregister            │
> │      (consumers already register with the controller — no poll) │
> │      → evaluate the consumer-count split rule immediately       │
> │                                                                  │
> │  Periodic AutoScaleTick — traffic, default 60s:                 │
> │    1. read SegmentLoadStats for all active segments             │
> │    2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None   │
> │    3. dispatch to existing splitSegment / mergeSegments         │
> └────────────────────────────────────────────────────────────────┘
> ```
> 
> The two trigger sources reflect their different latency needs: a new
> consumer should get its own segment **within seconds**, so it is
> handled the instant the consumer registers with the controller;
> traffic shifts up or down over **a minute or more**, so they are
> evaluated on a slower periodic tick. The only new persistent state is
> `SegmentLoadStats`. The split/merge *mechanics* are entirely reused
> from PIP-468.
> 
> ### Load reporting: push-to-metadata, not pull-per-tick
> 
> Each segment's owning broker runs a **`SegmentLoadReporter`** for
> every ACTIVE `segment://` topic it hosts. The broker writes
> `SegmentLoadStats` **directly to the metadata store** — it already has
> the rates in memory, so there is no REST round-trip and no
> controller-initiated pull. On a fixed sampling interval it compares
> the current rates to the last ones written and writes **only when a
> rate changes by more than a significant threshold** (default ±25%)
> since the last write. A steady-state segment writes once and then goes
> silent, keeping metadata write volume bounded regardless of traffic.
> 
> #### `SegmentLoadStats` (new metadata record)
> 
> Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:
> 
> ```json
> {
>   "msgRateIn": 12000.0,
>   "bytesRateIn": 64000000.0,
>   "msgRateOut": 48000.0,
>   "bytesRateOut": 256000000.0
> }
> ```
> 
> | Field | Source on the owning broker | Meaning for auto split/merge |
> |-------|------------------------------|---------------------------|
> | `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s
> rolling) | ingest load |
> | `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` |
> dispatch/fanout load (high for topics with many subscriptions) |
> 
> The record carries no timestamp of its own: the metadata store's
> `Stat` for the znode already exposes creation and last-modified
> timestamps, and the controller uses the **modified timestamp** for
> windowing. A record that still reads "cold" with an old modified time
> proves the segment has been cold for `now − modifiedAt` — so
> split/merge **windows derive from the store's `Stat`** with no
> per-tick history buffer and no extra field.
> 
> #### Significant-change threshold
> 
> To avoid rewriting on every minor wobble, the reporter only writes
> when a rate has moved by more than
> `scalableTopicLoadReportRateChangeThreshold` (default 25%) relative to
> the last value written for that segment. Sampling cadence is
> `scalableTopicLoadReportInterval` (default 10s). Both are configurable
> via `broker.conf`.
> 
> ### Subscription types and what each load type drives
> 
> Recall from PIP-468 that scalable-topic subscriptions are `STREAM`
> (controller-managed, 1:1 segment↔consumer assignment; covers both
> StreamConsumer and CheckpointConsumer) or `QUEUE`
> (controller-bypassing; every consumer attaches to every segment and
> the broker round-robins).
> 
> | Trigger | STREAM subscriptions | QUEUE subscriptions |
> |---------|----------------------|----------------------|
> | Consumer-count scale-up | **Yes** — more segments give more 1:1
> parallelism | **No** — queue consumers share segments; more segments
> don't add parallelism for them |
> | Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still
> loads the segment's broker and counts toward the per-segment rate |
> 
> So a topic with only QUEUE subscriptions never splits on consumer
> count, but still splits when any segment's in/out rate crosses
> threshold.
> 
> ### The decision: `AutoScalePolicyEvaluator`
> 
> A pure function with no I/O:
> 
> ```
> decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
>     → Split(segmentId) | Merge(segmentId1, segmentId2) | None
> ```
> 
> It runs in two passes — **split first (short cooldown), then merge
> (long cooldown)** — and emits at most one action per invocation.
> 
> #### Pass 1 — SPLIT (fast, lightly coalesced)
> 
> Splits fire as soon as conditions are met, bounded by `maxSegments`,
> an in-flight-operation guard, and a **short `splitCooldown` (default 1
> min)**. The cooldown is deliberately short: it exists only to coalesce
> a burst of near-simultaneous triggers — e.g. a group of consumers
> connecting in rapid succession should cause one split, not N — while
> still letting a genuinely growing topic split again on the next
> minute.
> 
> ```
> if activeSegments >= maxSegments: skip split pass
> if now - lastSplitAtMs < splitCooldown: skip split pass
> 
> (a) Consumer-driven:
>       required = max over STREAM subscriptions of consumerCount   //
> per-subscription max
>       if required > activeSegments:
>           → Split(busiest active segment by msgRateIn)
> 
> (b) Load-driven (if (a) didn't fire):
>       candidate segments = active segments where ANY of:
>         - msgRateIn   > splitMsgRateInThreshold
>         - bytesRateIn > splitBytesRateInThreshold
>         - msgRateOut  > splitMsgRateOutThreshold
>         - bytesRateOut> splitBytesRateOutThreshold
>       if candidates non-empty:
>           → Split(most-overloaded candidate); set lastSplitAtMs = now
> ```
> 
> The consumer-driven rule (a) is what the **event-driven path**
> evaluates the moment a consumer registers, so a new consumer gets a
> segment within seconds (subject to `splitCooldown`). The load-driven
> rule (b) runs on the periodic tick. Because `msgRateIn` etc. are
> already 60-second rolling averages on the broker, a value over
> threshold already represents *sustained* load — no extra split window
> is needed to filter transient spikes.
> 
> #### Pass 2 — MERGE (lazy, rate-limited)
> 
> Merges run only if no split fired this tick, the topic is not within
> `mergeCooldown` of its last merge, and the result respects
> `maxDagDepth`.
> 
> ```
> if a split fired this tick: skip merge pass
> if now - lastMergeAtMs < mergeCooldown: skip merge pass
> if activeSegments <= minSegments: skip merge pass
> 
> candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
>                   for at least mergeWindow (checked via the store
> Stat's modified time):
>     - msgRateIn   < mergeMsgRateInThreshold
>     - bytesRateIn < mergeBytesRateInThreshold
>     - msgRateOut  < mergeMsgRateOutThreshold
>     - bytesRateOut< mergeBytesRateOutThreshold
>   AND neither segment's lineage is already at maxDagDepth merges
> 
> if candidate pairs non-empty:
>     → Merge(coldest pair by combined rate); set lastMergeAtMs = now
> ```
> 
> Adjacency is required because the existing `mergeSegments` API only
> merges hash-range-adjacent active segments.
> 
> ### Anti-flip-flop: three independent guards
> 
> 1. **Threshold gap (hysteresis).** Split thresholds are well above
> merge thresholds for every metric. The dead-band between them is what
> prevents a just-merged segment from immediately re-qualifying for a
> split.
> 2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1
> min) that only coalesces bursts. Merges: a longer `mergeCooldown`
> (default 5 min) plus a `mergeWindow` (default 5 min) during which the
> segment must have stayed cold. A pair must be *durably* cold to merge,
> but a segment can split again within a minute of getting hot.
> 3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how
> many merges a given lineage can accumulate. Once reached, that lineage
> stops being a merge candidate — **but load-driven splits still fire.**
> This bounds the number of split↔merge cycles a hash range can churn
> through while never blocking a split that throughput requires.
> 
> > **Design note — direction of the depth cap.** The cap restricts *merges*, 
> > not splits. Splits are needed for correctness/performance and must always 
> > be available; merges are the optional efficiency step and are the ones 
> > that, combined with splits, could oscillate. `dagDepth` therefore counts 
> > **merges in a segment's lineage**, derived from the existing 
> > `parentIds`/`childIds` DAG in `ScalableTopicMetadata` — splits do not 
> > consume depth budget.
> 
> ### Caps
> 
> | Cap | Default | Effect |
> |-----|---------|--------|
> | `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
> | `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
> | `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits
> unaffected. |
> 
> ### Manual operations and cooldown
> 
> - Manual `admin.scalableTopics().splitSegment(...)` **sets
> `lastSplitAtMs`**, so a manual split also starts the short auto-split
> cooldown.
> - Manual `admin.scalableTopics().mergeSegments(...)` **sets
> `lastMergeAtMs`**, so the operator's manual efficiency action also
> rate-limits the controller's automatic merges.
> 
> ### Evaluation triggers
> 
> The controller leader evaluates auto split/merge from two sources:
> 
> - **Event-driven (within seconds)** — when a STREAM/CHECKPOINT
> consumer registers with or unregisters from the controller, it
> immediately evaluates the consumer-count split rule. No polling:
> consumer registration already flows through the controller (PIP-468).
> - **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC
> tick from PIP-468), default cadence `scalableTopicAutoScaleInterval =
> 60s`, evaluates the traffic-driven split rules and the merge pass. Per
> tick it does one metadata batch-read of the topic's `segments/*/load`
> records (or maintains a watch cache), evaluates, and dispatches.
> 
> Both sources call the same `AutoScalePolicyEvaluator`; the
> event-driven path only needs the consumer-count rule, so it is cheap.
> Both are cancelled on leadership loss / close.
> 
> ---
> 
> ## Public-Facing Changes
> 
> ### Configuration (`broker.conf`)
> 
> Auto-scaling is **default-on cluster-wide**; these are the defaults
> applied to every scalable topic that does not override them.
> 
> | Property | Default | Description |
> |----------|---------|-------------|
> | `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto
> split/merge. |
> | `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic)
> evaluation cadence. Consumer-count changes are handled event-driven,
> independent of this. |
> | `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
> | `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
> | `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before
> merges are disabled for it. |
> | `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic
> splits on a topic (coalesces bursts). |
> | `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic
> merges on a topic. |
> | `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold
> before merging. |
> | `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest
> split trigger. |
> | `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest
> split trigger. |
> | `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch
> split trigger. |
> | `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s
> dispatch split trigger. |
> | `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge
> trigger. |
> | `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest
> merge trigger. |
> | `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch
> merge trigger. |
> | `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s
> dispatch merge trigger. |
> | `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling 
> interval. |
> | `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate
> change since the last write that triggers a new `SegmentLoadStats`
> write. |
> 
> ### Policy resolution (namespace + topic overrides)
> 
> Following the existing `autoTopicCreationOverride` pattern, an
> `AutoScalePolicyOverride` can be set at two levels; resolution is
> most-specific-wins, falling back to `broker.conf`:
> 
> 1. **Per-topic** — a new `autoScalePolicy` field on
> `ScalableTopicMetadata`, set via
> `admin.scalableTopics().setAutoScalePolicy(topic, policy)` /
> `getAutoScalePolicy(topic)`.
> 2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on
> `Policies`, set via
> `admin.namespaces().setScalableTopicAutoScalePolicy(ns, policy)`.
> 
> `AutoScalePolicyOverride` carries the same knobs as the broker config
> (all optional; unset fields fall through). Setting `enabled = false`
> opts a topic or namespace out entirely.
> 
> ### Admin Client API
> 
> ```java
> interface ScalableTopics {
>     // ... existing ...
>     void setAutoScalePolicy(String topic, AutoScalePolicyOverride
> policy) throws PulsarAdminException;
>     AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
> PulsarAdminException;
>     void removeAutoScalePolicy(String topic) throws PulsarAdminException;
> }
> 
> interface Namespaces {
>     // ... existing ...
>     void setScalableTopicAutoScalePolicy(String namespace,
> AutoScalePolicyOverride policy) ...;
>     AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String
> namespace) ...;
>     void removeScalableTopicAutoScalePolicy(String namespace) ...;
> }
> ```
> 
> ### Metadata Store Paths
> 
> | Path | Content | Writer |
> |------|---------|--------|
> | `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` |
> `SegmentLoadStats` JSON | segment-owning broker |
> 
> (`autoScalePolicy` rides inside the existing `ScalableTopicMetadata`
> blob; the namespace override rides inside `Policies`. No other new
> paths.)
> 
> ### Observability
> 
> - New per-topic metrics: `pulsar_scalable_topic_active_segments`,
> `pulsar_scalable_topic_auto_splits_total`,
> `pulsar_scalable_topic_auto_merges_total`,
> `pulsar_scalable_topic_split_suppressed_max_segments_total`,
> `pulsar_scalable_topic_merge_suppressed_max_depth_total`.
> - The existing `ScalableTopicStats` is extended with the most recent
> `SegmentLoadStats` per segment and the resolved effective policy, so
> operators can see *why* the controller did or did not act.
> 
> ---
> 
> ## Operational Safety
> 
> The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and
> threshold-gap guards together bound both the rate and the total amount
> of structural change a topic can undergo, so enabling auto split/merge
> cannot cause unbounded segment growth or split/merge storms.
> 
> Operators who want manual-only control set
> `scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false`
> override (namespace/topic).
> 
> > **Compatibility:** scalable topics are a new, as-yet-unreleased feature 
> > ([PIP-460](pip-460.md)), so there is no backward/forward compatibility to 
> > consider — `SegmentLoadStats`, the policy fields, and the config knobs all 
> > ship together with the rest of the scalable-topic feature.
> 
> ---
> 
> ## Security Considerations
> 
> `setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace
> variants) require the same admin permissions as the corresponding
> existing scalable-topic and namespace policy operations.
> `SegmentLoadStats` is written by brokers via their authenticated
> internal identity and is not client-writable.
> 
> ---
> 
> ## Links
> 
> - Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
> - Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
> - V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
> 
> --
> Matteo Merli
> <[email protected]>
> 

Reply via email to