+1, really nice migration path from v4 non-partitioned and partitioned topics to v5 scalable topics!
-Lari On 2026/05/08 15:50:19 Matteo Merli wrote: > https://github.com/apache/pulsar/pull/25721 > > ------ > > > # PIP-475: Regular-to-Scalable Topic Migration > > *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)* > > ## Motivation > > [PIP-460](pip-460.md) introduces scalable topics (`topic://...`) as a > new topic type that supports range splitting and merging without > breaking key ordering. For this to be adoptable in real deployments, > users with existing partitioned and non-partitioned topics need a > migration path that: > > 1. **Doesn't require recreating their topics from scratch.** Existing > topics may hold months of retained data and have many active > subscriptions. Re-create + re-publish is not a viable upgrade story. > 2. **Lets clients adopt the V5 SDK before any topic is migrated.** > Operationally, applications need to be upgraded one at a time over > weeks, while the topics they read and write keep working as-is. The V5 > SDK has to interoperate with the *old* topic types until the migration > moment. > 3. **Keeps the migration moment small and surgical.** Once all clients > are on the V5 SDK, an admin command flips a topic from regular to > scalable in a single atomic step, without copying data or moving > cursors. > 4. **Cannot be reversed.** Once a topic is scalable, regressing to a > regular topic is unsafe (the new layout can have already split, > leaving data in segments that don't map back to a fixed partition > count). The metadata transition has to be one-way. > > PIP-460 lists "Tooling for migrating existing partitioned topics to > scalable topics" in its postponed section. This PIP closes that gap. > > This PIP also clarifies the V5 SDK's behavior when given a topic name > that may or may not be scalable, and tightens the broker so that a v4 > client cannot accidentally write to (or auto-create) a regular topic > that has already been migrated. > > The longer-term direction for Pulsar is for scalable topics to **fully > replace** partitioned and non-partitioned topics: the existing topic > types stay supported for backward compatibility, but new development > on the topic surface targets scalable topics, and migration tooling > like this PIP is what lets existing deployments make that transition > incrementally instead of all at once. > > --- > > ## Background Knowledge > > ### Topic domains in Pulsar today > > A Pulsar topic name encodes its domain in a URI scheme: > > - `persistent://t/n/x` — durable topic backed by a managed ledger. > - `non-persistent://t/n/x` — in-memory topic, no durability. > - `topic://t/n/x` — scalable topic introduced by PIP-460. Backed by a > DAG of segments; each segment is itself a `segment://...` topic with > its own managed ledger. > > A short name like `my-topic` is normalized to > `persistent://public/default/my-topic` by the v4 client. The V5 SDK > keeps the same normalization, then opens a scalable-topic lookup > session for the resolved name (see [V5 SDK resolution > rule](#v5-sdk-resolution-rule) below). > > ### Partitioned vs. non-partitioned regular topics > > Two distinct shapes: > > - **Non-partitioned**: a single `persistent://t/n/x` with one managed ledger. > - **Partitioned**: `persistent://t/n/x` is a logical name; the actual > data lives in `persistent://t/n/x-partition-0` … > `persistent://t/n/x-partition-(N-1)`, each with its own managed > ledger. The v4 client's partitioned producer routes each message to a > partition by `signSafeMod(murmurHash3_32(key), N)`. > > ### V5 segment routing > > A scalable topic's hash space is `[0x0000, 0xFFFF]`. Each active > segment owns a contiguous sub-range. The V5 producer routes a message > with key `K` to the segment whose range contains `murmurHash3_32(K) & > 0xFFFF`. Segments split into two halves; halves can later be merged > back. The routing function is range-based, not modulo-based. > > This difference matters for partitioned-topic migration: the migrated > layout's initial segments line up 1:1 with the partitions, but their > hash-range routing wouldn't agree with v4's mod-N routing for all > keys. The fix is described in [Routing across > migration](#routing-across-migration) below. > > --- > > ## Goals > > ### In Scope > > - A V5 SDK that operates against existing regular topics (partitioned > or non-partitioned) without requiring source changes from the > application. > - An admin command — `admin.scalableTopics().migrateToScalable(topic)` > — that converts an existing regular topic into a scalable topic in a > single atomic step, with no data copy and no cursor migration. > - A V5 SDK resolution rule that picks between the scalable code path > and a v4-compatible fallback based on the input topic name and > broker-side state, with a strict "once scalable, always scalable" > guarantee. > - A broker-side guard that prevents new v4 clients from accidentally > writing to (or auto-creating) a regular topic whose name has already > been claimed by a scalable topic. > - Preservation of per-key ordering across the migration moment, via > the same drain-before-assign protocol the controller uses for segment > splits: old partitions become sealed parent segments and the > subscription controller drains them before activating consumers on the > new children. > - Rejection of `non-persistent://` topics in V5 builders. > Non-persistent topics are out of scope for V5 entirely. > > ### Out of Scope > > - **Cross-cluster (geo-replication) migration coordination.** A topic > migrated in one cluster while still being replicated to another is a > separate problem, deferred until geo-replication on scalable topics > lands ([PIP-460](pip-460.md) sub-PIP). > - **Reverse migration (scalable → regular).** Not supported; the > metadata transition is one-way. > - **Migration triggered by traffic or load thresholds.** Migration is > operator-initiated only. > - **Per-message format conversion.** Messages stay byte-identical on > disk before and after migration; only the topic-name surface above the > managed ledger changes. > > --- > > ## High-Level Design > > ### V5 SDK resolution rule > > When a V5 application calls `client.newProducer(...).topic(input)` (or > any consumer builder), the SDK opens a **scalable-topic lookup > session** for the topic — the same long-lived push-based session that > PIP-468 defines for `topic://...` discovery. The lookup session is the > single source of truth for what the topic looks like, and how it > routes; there is no separate "probe" call and no client-side TTL > cache. > > The broker responds to the lookup session based on the input form and > the topic's current state: > > | Input form | Lookup response | > |---|---| > | `topic://t/n/x` | Real DAG layout (or `NotFound` if the scalable > topic doesn't exist — same as today). | > | `persistent://t/n/x` | If scalable metadata exists for the > equivalent `topic://t/n/x`: real DAG layout, with the topic identity > promoted to `topic://...` for all subsequent operations. Otherwise: a > **synthetic layout** that models the regular topic's partitions as > **special segments** (see [Special segments](#special-segments) > below). | > | `my-topic` (or any short / unscoped form) | Normalize to > `persistent://public/default/my-topic` then apply the rule above. | > | `non-persistent://...` | Reject at `create()` / `subscribe()` with > `UnsupportedOperationException`. V5 does not support non-persistent > topics. | > > Because the lookup session is push-based, the broker can **update the > layout in place** when the topic is migrated. The V5 SDK already > handles layout changes — splits and merges go through the same > machinery — so a migration is observed as one more layout-change > event: synthetic layout → real DAG. The application sees its > `Producer<T>` / `Consumer<T>` keep working through the transition; the > SDK's existing reconnection logic handles the underlying connection > swap internally. > > This gives the "once scalable, always scalable" guarantee (Goal 4) for > free: once the lookup session has reported a real DAG, future updates > can only refine the DAG via splits / merges; there is no "downgrade to > synthetic" path the broker exposes. > > ### Special segments > > A special segment is the lookup-session encoding of "this slice of the > keyspace is not yet a real `segment://...` topic — it's the existing > `persistent://t/n/x[-partition-K]` topic instead." It carries the same > fields as a regular segment plus a marker that points at the > underlying `persistent://...` name. > > The V5 SDK's per-segment producer and consumer infrastructure already > attaches to a topic name; the only difference for a special segment is > that the name uses the `persistent://` domain instead of `segment://`. > No separate code path, no separate wrapper class hierarchy. > > For routing, the layout carries the routing function as data: > > - **Synthetic layout for an N-partitioned regular topic**: N special > segments, one per partition; routing is > `signSafeMod(murmurHash3_32(key), N)` — exactly v4's > partitioned-producer routing. Producers route the same way the v4 SDK > would, ensuring per-key ordering during the migration window. > - **Synthetic layout for a non-partitioned regular topic**: 1 special > segment covering the full hash range; routing is trivial (no key > matters). > - **Real DAG (post-migration or natively scalable)**: range-based > routing — the standard scalable-topic semantics. Producers route by > hash range; per-key ordering across the migration boundary is > preserved by the controller's drain-before-assign protocol (see > [Routing across migration](#routing-across-migration) below). > > ### Migration protocol (operator's view) > > ``` > Pre-migration (steady state): > • Topic exists as persistent://t/n/x (with N partitions, possibly N=0) > • Some clients are still on the v4 SDK; others have been upgraded to V5. > • V5 clients see a SYNTHETIC layout from the lookup session: N special > segments (or 1 for non-partitioned) pointing at the existing > persistent://...-partition-K topics, with mod-N routing. > > Step 1 — Operator upgrades all clients to the V5 SDK. > Step 2 enforces this: the migration command inspects the topic's active > producer/consumer connections and fails with HTTP 409 if any v4 > connections remain. Old code keeps working unchanged before migration > because the synthetic layout exposes the existing persistent topics > through the V5 surface; routing is mod-N so per-key destinations are > identical to v4 partitioned-topic routing. > > Step 2 — Operator runs: > pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x > > The broker: > 2a. Validates that no v4 producer/consumer connections are attached to > the topic. If any are, fails with HTTP 409 and the connection count > in the error message. (Also fails 409 if scalable metadata already > exists, and 404 if the topic doesn't exist.) > 2b. Builds ScalableTopicMetadata with: > • N sealed parent segments (or 1 for non-partitioned), each > wrapping the existing managed ledger for > persistent://t/n/x-partition-K (or persistent://t/n/x for > non-partitioned). The managed ledgers are unchanged; no data copy. > • N active child segments (or 1) with equal-width contiguous hash > ranges covering [0x0000, 0xFFFF], using standard range-based > routing. Each child has all N parents as predecessors in the DAG. > New writes route to children; subscriptions drain the parents > before consuming from children — the same drain-before-assign > protocol the controller already uses for segment splits / merges. > 2c. Atomically writes the metadata, taking the topic from > "regular" to "scalable" in one CAS on the metadata store. > 2d. Pushes the new layout to every connected lookup session. > > Step 3 — Connected V5 clients receive the layout-update push on their > lookup session. Synthetic layout → real DAG. The SDK's existing > layout-change handling (used for split / merge) drives any internal > reconnection. Application-visible behaviour: nothing changes. > ``` > > ### Routing across migration > > The synthetic layout (pre-migration) routes mod-N so that V5 producers > write to the same partitions v4 producers would. The post-migration > real DAG routes by hash range — the standard scalable-topic semantics. > The two routings are not equivalent: a given key's destination segment > can change at the migration moment. > > Per-key ordering is preserved by the existing scalable-topic > drain-before-assign protocol, the same one the subscription controller > already uses for splits and merges: > > - Each old partition becomes a sealed parent segment in the new DAG. > - N new active child segments are created alongside, with equal-width > contiguous hash ranges; every child has all N parents as predecessors. > - The subscription controller fully drains the parents before > assigning their children to a consumer. By the time a consumer first > sees a message from a child, every message previously published to > those parents (including any with the same key) has already been > delivered. > > Producers route to children using standard range-based routing > immediately after the migration commit; their writes land in the new > segments while the parents are draining behind them. No special > routing flag is needed: the migration reuses the same machinery that > handles splits, with the parent fan-in (each child has N parents > instead of 1) being the only structural difference. > > For non-partitioned topics (N=1) the same protocol applies trivially: > one sealed parent and one active child covering the full hash range. > > --- > > ## Detailed Design > > ### 1. V5 SDK changes > > The SDK reuses its existing scalable code path — the lookup session, > per-segment producer / consumer infrastructure, layout-change handler > — for *every* topic, including regular ones. The only new bits are: > how the lookup session is opened from `persistent://` / short-form > input, how the SDK interprets a "special segment" entry in the layout, > and how it carries the routing function carried by the layout. > > #### 1.1 Lookup session opens for any input form > > `PulsarClientV5` opens a lookup session for the user's topic > regardless of domain. The existing scalable-topic lookup-session > machinery is extended so the broker accepts `persistent://...` and > short-form names in addition to `topic://...`. The session response > carries: > > - The promoted topic identity (always `topic://t/n/x` after normalization). > - The current layout — either a real DAG or a synthetic layout (see > [Special segments](#special-segments) above). > - The routing function (`mod-N` for the synthetic layout, > `range-based` for the real DAG). > > `non-persistent://...` inputs are rejected at the V5 builder before > the lookup is opened, with `UnsupportedOperationException`. > > #### 1.2 Special-segment handling in the per-segment infrastructure > > A regular `Segment` carries a `segment://...` URI. A special segment > carries a `persistent://...` URI instead, plus a flag indicating it's > special. The SDK's per-segment producer (`PerSegmentProducer`) and > per-segment consumer (`PerSegmentConsumer`) attach to whatever URI the > segment carries — the v4 producer / consumer beneath them already > accepts `persistent://` and `segment://` alike. No separate adapter > classes are introduced. > > V5-specific features that don't apply on a regular topic surface as > ordinary "this layout doesn't support that" errors at the API surface: > > - `CheckpointConsumer` requires a sealed-segment history; on a > synthetic layout there is none, so subscribe fails with a clear error > pointing at the migration command. > - `splitSegment` / `mergeSegments` admin operations on a synthetic > layout fail at the broker with the same error class — the operations > require real `ScalableTopicMetadata`. > - `topic://...`-domain DLQ targets work transparently for both layouts > because the DLQ is its own scalable topic; nothing about it depends on > the source topic's layout shape. > > #### 1.3 Layout-change handling drives the migration transition > > Layout updates pushed by the lookup session already trigger the SDK's > per-segment reconcile: segments that disappeared get their per-segment > producers / consumers torn down; new segments get fresh ones; segments > whose URI changed get rebuilt on the new URI. > > A migration is exactly this: the special segment with URI > `persistent://t/n/x-partition-K` is replaced in the new layout with a > real segment whose URI is `segment://t/n/x/<descriptor>` — and that > `segment://...` resolves to the same managed ledger. The reconciler > tears down the per-segment v4 producer/consumer attached to the > `persistent://` URI and reattaches one to the `segment://` URI; from > the application's perspective the SDK's internal reconnect happens (as > it does for any layout change) but the `Producer<T>` / `Consumer<T>` > reference and the publish/receive flow are unaffected. > > There is no `TopicMigratedException` exposed to the application by > default. Applications that *want* to observe migrations can subscribe > to the lookup session's layout-change events directly via a future > hook — out of scope for this PIP. > > ### 2. Migration command > > #### 2.1 Admin REST endpoint > > `POST > /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` > — requires `produce` permission on the topic (the same permission > needed to write to it in the first place). Migration is irreversible > but does not destroy data, so the blast radius is bounded by what a > write-permissioned user can already do; super-user is not required. > > No request body. > > The broker counts v4 producer / consumer connections via the existing > topic-stats path and rejects the migration with HTTP 409 if any are > present, with the count in the error message. > > Response: `204 No Content` on success. `409 Conflict` if scalable > metadata already exists or v4 connections are still attached. `404` if > the topic doesn't exist. > > #### 2.2 Admin client > > ```java > package org.apache.pulsar.client.admin; > > public interface ScalableTopics { > /** Migrate an existing partitioned or non-partitioned topic to a > scalable topic. */ > void migrateToScalable(String topic) throws PulsarAdminException; > CompletableFuture<Void> migrateToScalableAsync(String topic); > } > ``` > > #### 2.3 CLI > > ``` > pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x > ``` > > #### 2.4 Broker-side migration steps > > Executed by the topic's owning broker, atomically as far as possible: > > 1. **Lock**: acquire a metadata-store lock on `/topics/t/n/x` to > prevent concurrent migrations or competing admin operations. > 2. **Precheck**: > - Topic exists (as either partitioned or non-partitioned). > - No `ScalableTopicMetadata` already exists at the same path. If it > does, fail 409. > - No v4 producer / consumer connections are attached. If any are, > fail 409 with the count in the error message. > 3. **Build initial layout**: > - N sealed parent segments (or 1 for non-partitioned), each > wrapping the existing managed ledger for > `persistent://t/n/x-partition-K` (or `persistent://t/n/x`). > - N active child segments (or 1 for non-partitioned) with > equal-width contiguous hash ranges covering `[0x0000, 0xFFFF]`. > Routing is range-based. > - DAG edges: every child has all N parents as predecessors. The > subscription controller's drain-before-assign protocol (already used > for splits / merges) preserves per-key ordering. > - Set `epoch = 0`, `nextSegmentId = 2N`. > 4. **Atomic flip**: write `ScalableTopicMetadata` to `/topics/t/n/x` > via metadata-store CAS. This is the commit point — once it succeeds, > the topic is scalable. > 5. **Push the new layout to every connected lookup session.** V5 > clients that were seeing the synthetic layout for this topic > transition to the real DAG via the same machinery used for split / > merge layout updates. No `TerminateTopic` is needed — the underlying > managed ledgers don't change identity, only the layout-level segment > names that wrap them do. > 6. **Release the lock**. > > Failures before step 4 leave the topic untouched; failures after step > 4 leave the topic scalable. Step 5 is best-effort per session — > late-joining lookups always read the freshest metadata directly. > > ### 3. Lookup-session guard for v4 clients > > The lookup session described above is V5-only. v4 clients use the > older `CommandLookupTopic`. The broker must, for v4 lookups of > `persistent://t/n/x` where `ScalableTopicMetadata` exists for > `topic://t/n/x`, return a `TopicMigrated` redirect (binary protocol) / > HTTP 410 Gone (REST). The error carries the new `topic://...` name. v4 > clients translate this into a hard failure (they can't speak the > scalable protocol); operators see a clear signal that some clients > haven't upgraded yet. > > This guard is what makes the "once scalable, always scalable" > guarantee robust against stale v4 clients or v4-only tooling. V5 > clients are guarded by the lookup session itself — once it has > reported a real DAG, the broker only ever pushes refinements (split / > merge), never a downgrade. > > ### 4. `migratedFrom` on `ScalableTopicMetadata` > > An optional informational field `migratedFrom: TopicMigrationOrigin?` > records pre-migration metadata (partition count, original > `persistent://` name) for debugging, metrics labelling, and future > tooling. Not consulted on hot paths. > > No `legacyModNRouting` flag or other migration-specific routing knob > is needed: the post-migration real DAG uses standard range-based > routing and per-key ordering is preserved by the controller's existing > drain-before-assign protocol (see [Routing across > migration](#routing-across-migration)). > > --- > > ## Public-facing changes > > ### REST API > > New endpoint: > - `POST > /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` > — requires `produce` permission on the topic; no request body; 204 on > success, 409 if already scalable or v4 connections are still attached, > 404 if topic missing. > > Modified endpoints: > - All v4 lookup endpoints: when scalable metadata exists for the > equivalent `topic://...`, return a `TopicMigrated` redirect with the > new name. > > ### Binary protocol > > - The existing scalable-topic lookup-session command (PIP-468) is > extended to accept `persistent://...` and short-form names in addition > to `topic://...`. For non-scalable topics it returns a synthetic > layout with special-segment entries. > - New error code `TopicMigrated` — sent by the broker on the v4 lookup > command (`CommandLookupTopic`) when the requested `persistent://...` > name is shadowed by an existing scalable topic. Carries the new > `topic://...` name in the error payload. V5 clients never see this > error because they always use the scalable lookup session. > > ### Configuration > > New broker config: > - `enableScalableTopicMigration` (default `true`) — kill switch for > the migration command. Operators on regulated infra may want to > disable. > > ### CLI > > - `pulsar-admin scalable-topics migrate-to-scalable <topic>` > > ### V5 SDK behavior > > - V5 builders accept `persistent://...` and short-form names; the SDK > opens a scalable-topic lookup session that the broker resolves to > either a real DAG or a synthetic layout. > - V5 builders reject `non-persistent://...` with > `UnsupportedOperationException`. > - Migration is observed as a layout-change push on the lookup session; > no new SDK exception is exposed to the application. > > --- > > ## Backward & forward compatibility > > ### Upgrade > > The upgrade story this PIP supports is: > > 1. Upgrade brokers to the version containing this PIP. Brokers handle > both old `persistent://` clients and new `topic://` clients side by > side; no behavior change for existing topics. > 2. Upgrade applications to the V5 SDK at the operator's pace. No topic > changes required; V5 SDK uses the wrapper path. > 3. Once all applications on a given topic are V5, run > `migrate-to-scalable`. The migration is atomic and one-way. > > Old client versions continue to work with the cluster on un-migrated topics. > > ### Downgrade / Rollback > > - A broker downgrade *before* any topic has been migrated is fully supported. > - A broker downgrade *after* a topic has been migrated is **not > supported**: older brokers don't understand the scalable-metadata > layout. Recovery would require restoring the metadata store from > backup. The migration command should be treated as a one-way commit. > - A V5 SDK client can be downgraded back to v4 only if the topic was > never migrated. Once migrated, the topic only speaks the scalable > protocol. > > ### Geo-replication > > Out of scope; see [Goals: Out of Scope](#out-of-scope). > > --- > > ## Alternatives > > ### Alt 1: Probe-and-wrapper in the SDK (rejected) > > An earlier draft of this PIP had the V5 SDK probe via > `admin.scalableTopics().getMetadataAsync(topic)` at every builder > call, cache the verdict with a TTL, and use a separate v4-wrapper code > path for regular topics. Migration would be observed by connected > clients as `TopicTerminatedException` from the v4 wrapper, which the > SDK would catch and use as the cue to re-probe and rebuild on the > scalable path. > > Rejected in favour of the lookup-session approach because: > - Two SDK code paths (scalable + v4 wrapper) would have to be > maintained and tested in parallel. > - The lookup session is push-based; the probe approach forces a TTL > trade-off (long TTL = slow to notice migration; short TTL = constant > load). > - Migration was visible to applications as a `TopicMigratedException` > on receive futures, however brief; the lookup-session approach makes > it strictly an internal SDK reconnect, with no application-visible > signal at all. > - The hash-routing equivalence problem ([Routing across > migration](#routing-across-migration)) had to be papered over with a > documented constraint; with the lookup session carrying the routing > function as data, the synthetic layout's mod-N routing and the real > DAG's range-based routing coexist naturally, with per-key ordering > preserved by the controller's drain-before-assign protocol. > > ### Alt 2: Migration via per-message data copy > > The migration command would create a new scalable topic alongside the > regular one and stream all retained data through. Producers and > consumers would cut over after the copy completes. > > Rejected because: > - The metadata-flip approach in this PIP achieves the same result with > no data movement. > - A retained topic with months of data could take hours to copy, > during which producers and consumers would have no clear cut-over > moment. > > --- > > ## General Notes > > - `migratedFrom` is an optional informational field on > `ScalableTopicMetadata` (partition count, original `persistent://` > name) for debugging and metrics labelling; not consulted on hot paths. > > --- > > ## Links > > - [PIP-460: Scalable Topics](pip-460.md) — parent PIP. > - [PIP-468: Scalable Topic Controller](pip-468.md) — sibling sub-PIP, > defines the metadata-store schema this PIP extends. > > > -- > Matteo Merli > <[email protected]> >
