This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab1331e02c [improve][broker] PIP-483: Scalable Topic Auto Split/Merge
(#25938)
1ab1331e02c is described below
commit 1ab1331e02ccc0a365cdaa3ed0383a11343637d8
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 11 09:39:21 2026 -0700
[improve][broker] PIP-483: Scalable Topic Auto Split/Merge (#25938)
---
pip/pip-483.md | 285 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 285 insertions(+)
diff --git a/pip/pip-483.md b/pip/pip-483.md
new file mode 100644
index 00000000000..2575c541c02
--- /dev/null
+++ b/pip/pip-483.md
@@ -0,0 +1,285 @@
+# PIP-483: Scalable Topic Auto Split/Merge
+
+*Sub-PIP of [PIP-468: Scalable Topic Controller](pip-468.md)*
+
+## Motivation
+
+[PIP-468](pip-468.md) gives the Scalable Topic Controller the ability to
**split** and **merge** segments, but only on explicit operator request via the
admin API. An operator has to watch the topic, decide it is hot, and issue a
split — and later notice it has gone cold and issue a merge. This is the same
operational toil that partition-count management imposes on classic Pulsar
topics, which scalable topics were meant to eliminate.
+
+This PIP adds an **auto-scaling policy** to the controller: the controller
leader observes per-segment load and per-subscription consumer pressure, and
autonomously splits hot segments and merges cold ones, within hard caps that
prevent runaway growth and split/merge flip-flopping.
+
+The design is built around three principles that came out of the design
discussion:
+
+1. **Splits are fast; merges are lazy.** A split protects throughput and
latency under load, so it fires quickly — with only a short cooldown to
coalesce bursts of near-simultaneous triggers (e.g. a group of consumers
connecting in rapid succession). A merge is purely an efficiency reclaim, so it
can wait, be rate-limited, and be skipped when in doubt.
+2. **The controller reacts, it does not poll.** New stream/checkpoint
consumers register directly with the controller, so consumer-count changes are
handled event-driven within seconds. Load data is *pushed* into the metadata
store by each segment's owning broker (only when it changes materially) and
read by the controller leader. The controller never fans out RPCs to segment
owners.
+3. **The decision is a pure function.** Given a snapshot of load + layout +
policy, the split/merge decision is deterministic and unit-testable in
isolation from all I/O.
+
+---
+
+## Goals
+
+- Automatically increase segment count when a topic is under ingest/dispatch
load or has more stream/checkpoint consumers than segments.
+- Automatically decrease segment count when load subsides, reclaiming broker
resources.
+- Bound growth (`maxSegments`) and bound split↔merge churn (`maxDagDepth`,
asymmetric cooldown).
+- Default-on cluster-wide, with per-namespace and per-topic overrides,
following Pulsar's existing policy-resolution conventions.
+
+### Non-goals
+
+- **Broker placement / rebalancing.** Which broker owns a segment's bundle is
the load balancer's job; this PIP only changes *how many* segments exist.
+- **Key-aware or non-midpoint splits.** Splits use the existing midpoint-split
mechanism from PIP-468.
+- **Cross-topic global optimization.** Each topic's controller decides
independently.
+
+---
+
+## Design
+
+### Overview
+
+```
+┌────────────────────────────────────────────────────────────────┐
+│ Segment-owning broker (per active segment) │
+│ SegmentLoadReporter │
+│ - samples the segment topic's TopicStats │
+│ - writes SegmentLoadStats to metadata ONLY on material change │
+└───────────────────────────────┬────────────────────────────────┘
+ │ (metadata store, push-on-change)
+ ▼
+┌────────────────────────────────────────────────────────────────┐
+│ Controller leader (per scalable topic) │
+│ │
+│ Event-driven — within seconds: │
+│ on STREAM/CHECKPOINT consumer register/unregister │
+│ (consumers already register with the controller — no poll) │
+│ → evaluate the consumer-count split rule immediately │
+│ │
+│ Periodic AutoScaleTick — traffic, default 60s: │
+│ 1. read SegmentLoadStats for all active segments │
+│ 2. AutoScalePolicyEvaluator.decide(...) → Split|Merge|None │
+│ 3. dispatch to existing splitSegment / mergeSegments │
+└────────────────────────────────────────────────────────────────┘
+```
+
+The two trigger sources reflect their different latency needs: a new consumer
should get its own segment **within seconds**, so it is handled the instant the
consumer registers with the controller; traffic shifts up or down over **a
minute or more**, so they are evaluated on a slower periodic tick. The only new
persistent state is `SegmentLoadStats`. The split/merge *mechanics* are
entirely reused from PIP-468.
+
+### Load reporting: push-to-metadata, not pull-per-tick
+
+Each segment's owning broker runs a **`SegmentLoadReporter`** for every ACTIVE
`segment://` topic it hosts. The broker writes `SegmentLoadStats` **directly to
the metadata store** — it already has the rates in memory, so there is no REST
round-trip and no controller-initiated pull. On a fixed sampling interval it
compares the current rates to the last ones written and writes **only when a
rate changes by more than a significant threshold** (default ±25%) since the
last write. A steady-st [...]
+
+#### `SegmentLoadStats` (new metadata record)
+
+Stored at `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load`:
+
+```json
+{
+ "msgRateIn": 12000.0,
+ "bytesRateIn": 64000000.0,
+ "msgRateOut": 48000.0,
+ "bytesRateOut": 256000000.0
+}
+```
+
+| Field | Source on the owning broker | Meaning for auto split/merge |
+|-------|------------------------------|---------------------------|
+| `msgRateIn` / `bytesRateIn` | segment topic `TopicStats` (60s rolling) |
ingest load |
+| `msgRateOut` / `bytesRateOut` | segment topic `TopicStats` | dispatch/fanout
load (high for topics with many subscriptions) |
+
+The record carries no timestamp of its own: the metadata store's `Stat` for
the znode already exposes creation and last-modified timestamps, and the
controller uses the **modified timestamp** for windowing. A record that still
reads "cold" with an old modified time proves the segment has been cold for
`now − modifiedAt` — so split/merge **windows derive from the store's `Stat`**
with no per-tick history buffer and no extra field.
+
+#### Significant-change threshold
+
+To avoid rewriting on every minor wobble, the reporter only writes when a rate
has moved by more than `scalableTopicLoadReportRateChangeThreshold` (default
25%) relative to the last value written for that segment. Sampling cadence is
`scalableTopicLoadReportInterval` (default 10s). Both are configurable via
`broker.conf`.
+
+### Subscription types and what each load type drives
+
+Recall from PIP-468 that scalable-topic subscriptions are `STREAM`
(controller-managed, 1:1 segment↔consumer assignment; covers both
StreamConsumer and CheckpointConsumer) or `QUEUE` (controller-bypassing; every
consumer attaches to every segment and the broker round-robins).
+
+| Trigger | STREAM subscriptions | QUEUE subscriptions |
+|---------|----------------------|----------------------|
+| Consumer-count scale-up | **Yes** — more segments give more 1:1 parallelism
| **No** — queue consumers share segments; more segments don't add parallelism
for them |
+| Traffic (in/out, msg/bytes) | Yes | **Yes** — queue traffic still loads the
segment's broker and counts toward the per-segment rate |
+
+So a topic with only QUEUE subscriptions never splits on consumer count, but
still splits when any segment's in/out rate crosses threshold.
+
+### The decision: `AutoScalePolicyEvaluator`
+
+A pure function with no I/O:
+
+```
+decide(layout, loadBySegment, streamConsumerCountBySub, policy, now)
+ → Split(segmentId) | Merge(segmentId1, segmentId2) | None
+```
+
+It runs in two passes — **split first (short cooldown), then merge (long
cooldown)** — and emits at most one action per invocation.
+
+#### Pass 1 — SPLIT (fast, lightly coalesced)
+
+Splits fire as soon as conditions are met, bounded by `maxSegments`, an
in-flight-operation guard, and a **short `splitCooldown` (default 1 min)**. The
cooldown is deliberately short: it exists only to coalesce a burst of
near-simultaneous triggers — e.g. a group of consumers connecting in rapid
succession should cause one split, not N — while still letting a genuinely
growing topic split again on the next minute.
+
+```
+if activeSegments >= maxSegments: skip split pass
+if now - lastSplitAtMs < splitCooldown: skip split pass
+
+(a) Consumer-driven:
+ required = max over STREAM subscriptions of consumerCount //
per-subscription max
+ if required > activeSegments:
+ → Split(busiest active segment by msgRateIn)
+
+(b) Load-driven (if (a) didn't fire):
+ candidate segments = active segments where ANY of:
+ - msgRateIn > splitMsgRateInThreshold
+ - bytesRateIn > splitBytesRateInThreshold
+ - msgRateOut > splitMsgRateOutThreshold
+ - bytesRateOut> splitBytesRateOutThreshold
+ if candidates non-empty:
+ → Split(most-overloaded candidate); set lastSplitAtMs = now
+```
+
+The consumer-driven rule (a) is what the **event-driven path** evaluates the
moment a consumer registers, so a new consumer gets a segment within seconds
(subject to `splitCooldown`). The load-driven rule (b) runs on the periodic
tick. Because `msgRateIn` etc. are already 60-second rolling averages on the
broker, a value over threshold already represents *sustained* load — no extra
split window is needed to filter transient spikes.
+
+#### Pass 2 — MERGE (lazy, rate-limited)
+
+Merges run only if no split fired this tick, the topic is not within
`mergeCooldown` of its last merge, and the result respects `maxDagDepth`.
+
+```
+if a split fired this tick: skip merge pass
+if now - lastMergeAtMs < mergeCooldown: skip merge pass
+if activeSegments <= minSegments: skip merge pass
+
+candidate pairs = adjacent ACTIVE segment pairs where BOTH segments satisfy,
+ for at least mergeWindow (checked via the store Stat's
modified time):
+ - msgRateIn < mergeMsgRateInThreshold
+ - bytesRateIn < mergeBytesRateInThreshold
+ - msgRateOut < mergeMsgRateOutThreshold
+ - bytesRateOut< mergeBytesRateOutThreshold
+ AND neither segment's lineage is already at maxDagDepth merges
+
+if candidate pairs non-empty:
+ → Merge(coldest pair by combined rate); set lastMergeAtMs = now
+```
+
+Adjacency is required because the existing `mergeSegments` API only merges
hash-range-adjacent active segments.
+
+### Anti-flip-flop: three independent guards
+
+1. **Threshold gap (hysteresis).** Split thresholds are well above merge
thresholds for every metric. The dead-band between them is what prevents a
just-merged segment from immediately re-qualifying for a split.
+2. **Asymmetric cooldown.** Splits: a short `splitCooldown` (default 1 min)
that only coalesces bursts. Merges: a longer `mergeCooldown` (default 5 min)
plus a `mergeWindow` (default 5 min) during which the segment must have stayed
cold. A pair must be *durably* cold to merge, but a segment can split again
within a minute of getting hot.
+3. **Max DAG depth on merges.** `maxDagDepth` (default 10) caps how many
merges a given lineage can accumulate. Once reached, that lineage stops being a
merge candidate — **but load-driven splits still fire.** This bounds the number
of split↔merge cycles a hash range can churn through while never blocking a
split that throughput requires.
+
+> **Design note — direction of the depth cap.** The cap restricts *merges*,
not splits. Splits are needed for correctness/performance and must always be
available; merges are the optional efficiency step and are the ones that,
combined with splits, could oscillate. `dagDepth` therefore counts **merges in
a segment's lineage**, derived from the existing `parentIds`/`childIds` DAG in
`ScalableTopicMetadata` — splits do not consume depth budget.
+
+### Caps
+
+| Cap | Default | Effect |
+|-----|---------|--------|
+| `maxSegments` | 64 | Splits stop once `activeSegments == maxSegments`. |
+| `minSegments` | 1 | Merges stop once `activeSegments == minSegments`. |
+| `maxDagDepth` | 10 | Merges stop for a lineage at the cap; splits
unaffected. |
+
+### Manual operations and cooldown
+
+- Manual `admin.scalableTopics().splitSegment(...)` **sets `lastSplitAtMs`**,
so a manual split also starts the short auto-split cooldown.
+- Manual `admin.scalableTopics().mergeSegments(...)` **sets `lastMergeAtMs`**,
so the operator's manual efficiency action also rate-limits the controller's
automatic merges.
+
+### Evaluation triggers
+
+The controller leader evaluates auto split/merge from two sources:
+
+- **Event-driven (within seconds)** — when a STREAM/CHECKPOINT consumer
registers with or unregisters from the controller, it immediately evaluates the
consumer-count split rule. No polling: consumer registration already flows
through the controller (PIP-468).
+- **Periodic tick** — a `scheduleAutoScaleTick` (separate from the GC tick
from PIP-468), default cadence `scalableTopicAutoScaleInterval = 60s`,
evaluates the traffic-driven split rules and the merge pass. Per tick it does
one metadata batch-read of the topic's `segments/*/load` records (or maintains
a watch cache), evaluates, and dispatches.
+
+Both sources call the same `AutoScalePolicyEvaluator`; the event-driven path
only needs the consumer-count rule, so it is cheap. Both are cancelled on
leadership loss / close.
+
+---
+
+## Public-Facing Changes
+
+### Configuration (`broker.conf`)
+
+Auto-scaling is **default-on cluster-wide**; these are the defaults applied to
every scalable topic that does not override them.
+
+| Property | Default | Description |
+|----------|---------|-------------|
+| `scalableTopicAutoScaleEnabled` | `true` | Master switch for auto
split/merge. |
+| `scalableTopicAutoScaleInterval` | `60s` | Periodic (traffic) evaluation
cadence. Consumer-count changes are handled event-driven, independent of this. |
+| `scalableTopicMaxSegments` | `64` | Hard ceiling on active segments. |
+| `scalableTopicMinSegments` | `1` | Hard floor on active segments. |
+| `scalableTopicMaxDagDepth` | `10` | Max merges in a lineage before merges
are disabled for it. |
+| `scalableTopicSplitCooldown` | `1m` | Minimum time between automatic splits
on a topic (coalesces bursts). |
+| `scalableTopicMergeCooldown` | `5m` | Minimum time between automatic merges
on a topic. |
+| `scalableTopicMergeWindow` | `5m` | Duration a pair must stay cold before
merging. |
+| `scalableTopicSplitMsgRateInThreshold` | `10000` | msg/s ingest split
trigger. |
+| `scalableTopicSplitBytesRateInThreshold` | `50MB` | bytes/s ingest split
trigger. |
+| `scalableTopicSplitMsgRateOutThreshold` | `50000` | msg/s dispatch split
trigger. |
+| `scalableTopicSplitBytesRateOutThreshold` | `250MB` | bytes/s dispatch split
trigger. |
+| `scalableTopicMergeMsgRateInThreshold` | `1000` | msg/s ingest merge
trigger. |
+| `scalableTopicMergeBytesRateInThreshold` | `5MB` | bytes/s ingest merge
trigger. |
+| `scalableTopicMergeMsgRateOutThreshold` | `5000` | msg/s dispatch merge
trigger. |
+| `scalableTopicMergeBytesRateOutThreshold` | `25MB` | bytes/s dispatch merge
trigger. |
+| `scalableTopicLoadReportInterval` | `10s` | Segment owner sampling interval.
|
+| `scalableTopicLoadReportRateChangeThreshold` | `25%` | Minimum rate change
since the last write that triggers a new `SegmentLoadStats` write. |
+
+### Policy resolution (namespace + topic overrides)
+
+Following the existing `autoTopicCreationOverride` pattern, an
`AutoScalePolicyOverride` can be set at two levels; resolution is
most-specific-wins, falling back to `broker.conf`:
+
+1. **Per-topic** — a new `autoScalePolicy` field on `ScalableTopicMetadata`,
set via `admin.scalableTopics().setAutoScalePolicy(topic, policy)` /
`getAutoScalePolicy(topic)`.
+2. **Per-namespace** — a new `scalableTopicAutoScalePolicy` field on
`Policies`, set via `admin.namespaces().setScalableTopicAutoScalePolicy(ns,
policy)`.
+
+`AutoScalePolicyOverride` carries the same knobs as the broker config (all
optional; unset fields fall through). Setting `enabled = false` opts a topic or
namespace out entirely.
+
+### Admin Client API
+
+```java
+interface ScalableTopics {
+ // ... existing ...
+ void setAutoScalePolicy(String topic, AutoScalePolicyOverride policy)
throws PulsarAdminException;
+ AutoScalePolicyOverride getAutoScalePolicy(String topic) throws
PulsarAdminException;
+ void removeAutoScalePolicy(String topic) throws PulsarAdminException;
+}
+
+interface Namespaces {
+ // ... existing ...
+ void setScalableTopicAutoScalePolicy(String namespace,
AutoScalePolicyOverride policy) ...;
+ AutoScalePolicyOverride getScalableTopicAutoScalePolicy(String namespace)
...;
+ void removeScalableTopicAutoScalePolicy(String namespace) ...;
+}
+```
+
+### Metadata Store Paths
+
+| Path | Content | Writer |
+|------|---------|--------|
+| `/topics/{tenant}/{ns}/{topic}/segments/{segmentId}/load` |
`SegmentLoadStats` JSON | segment-owning broker |
+
+(`autoScalePolicy` rides inside the existing `ScalableTopicMetadata` blob; the
namespace override rides inside `Policies`. No other new paths.)
+
+### Observability
+
+- New per-topic metrics: `pulsar_scalable_topic_active_segments`,
`pulsar_scalable_topic_auto_splits_total`,
`pulsar_scalable_topic_auto_merges_total`,
`pulsar_scalable_topic_split_suppressed_max_segments_total`,
`pulsar_scalable_topic_merge_suppressed_max_depth_total`.
+- The existing `ScalableTopicStats` is extended with the most recent
`SegmentLoadStats` per segment and the resolved effective policy, so operators
can see *why* the controller did or did not act.
+
+---
+
+## Operational Safety
+
+The `maxSegments`, `maxDagDepth`, asymmetric cooldown, and threshold-gap
guards together bound both the rate and the total amount of structural change a
topic can undergo, so enabling auto split/merge cannot cause unbounded segment
growth or split/merge storms.
+
+Operators who want manual-only control set
`scalableTopicAutoScaleEnabled=false` (cluster) or an `enabled=false` override
(namespace/topic).
+
+> **Compatibility:** scalable topics are a new, as-yet-unreleased feature
([PIP-460](pip-460.md)), so there is no backward/forward compatibility to
consider — `SegmentLoadStats`, the policy fields, and the config knobs all ship
together with the rest of the scalable-topic feature.
+
+---
+
+## Security Considerations
+
+`setAutoScalePolicy` / `getAutoScalePolicy` (topic and namespace variants)
require the same admin permissions as the corresponding existing scalable-topic
and namespace policy operations. `SegmentLoadStats` is written by brokers via
their authenticated internal identity and is not client-writable.
+
+---
+
+## Links
+
+- Parent PIP: [PIP-468: Scalable Topic Controller](pip-468.md)
+- Grand-parent PIP: [PIP-460: Scalable Topics](pip-460.md)
+- V5 Client API: [PIP-466: New Java Client API (V5)](pip-466.md)
+- Mailing List discussion thread: TBD
+- Mailing List voting thread: TBD