https://github.com/apache/pulsar/pull/25721

------


# PIP-475: Regular-to-Scalable Topic Migration

*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*

## Motivation

[PIP-460](pip-460.md) introduces scalable topics (`topic://...`) as a
new topic type that supports range splitting and merging without
breaking key ordering. For this to be adoptable in real deployments,
users with existing partitioned and non-partitioned topics need a
migration path that:

1. **Doesn't require recreating their topics from scratch.** Existing
topics may hold months of retained data and have many active
subscriptions. Re-create + re-publish is not a viable upgrade story.
2. **Lets clients adopt the V5 SDK before any topic is migrated.**
Operationally, applications need to be upgraded one at a time over
weeks, while the topics they read and write keep working as-is. The V5
SDK has to interoperate with the *old* topic types until the migration
moment.
3. **Keeps the migration moment small and surgical.** Once all clients
are on the V5 SDK, an admin command flips a topic from regular to
scalable in a single atomic step, without copying data or moving
cursors.
4. **Cannot be reversed.** Once a topic is scalable, regressing to a
regular topic is unsafe (the new layout can have already split,
leaving data in segments that don't map back to a fixed partition
count). The metadata transition has to be one-way.

PIP-460 lists "Tooling for migrating existing partitioned topics to
scalable topics" in its postponed section. This PIP closes that gap.

This PIP also clarifies the V5 SDK's behavior when given a topic name
that may or may not be scalable, and tightens the broker so that a v4
client cannot accidentally write to (or auto-create) a regular topic
that has already been migrated.

The longer-term direction for Pulsar is for scalable topics to **fully
replace** partitioned and non-partitioned topics: the existing topic
types stay supported for backward compatibility, but new development
on the topic surface targets scalable topics, and migration tooling
like this PIP is what lets existing deployments make that transition
incrementally instead of all at once.

---

## Background Knowledge

### Topic domains in Pulsar today

A Pulsar topic name encodes its domain in a URI scheme:

- `persistent://t/n/x` — durable topic backed by a managed ledger.
- `non-persistent://t/n/x` — in-memory topic, no durability.
- `topic://t/n/x` — scalable topic introduced by PIP-460. Backed by a
DAG of segments; each segment is itself a `segment://...` topic with
its own managed ledger.

A short name like `my-topic` is normalized to
`persistent://public/default/my-topic` by the v4 client. The V5 SDK
keeps the same normalization, then opens a scalable-topic lookup
session for the resolved name (see [V5 SDK resolution
rule](#v5-sdk-resolution-rule) below).

### Partitioned vs. non-partitioned regular topics

Two distinct shapes:

- **Non-partitioned**: a single `persistent://t/n/x` with one managed ledger.
- **Partitioned**: `persistent://t/n/x` is a logical name; the actual
data lives in `persistent://t/n/x-partition-0` …
`persistent://t/n/x-partition-(N-1)`, each with its own managed
ledger. The v4 client's partitioned producer routes each message to a
partition by `signSafeMod(murmurHash3_32(key), N)`.

### V5 segment routing

A scalable topic's hash space is `[0x0000, 0xFFFF]`. Each active
segment owns a contiguous sub-range. The V5 producer routes a message
with key `K` to the segment whose range contains `murmurHash3_32(K) &
0xFFFF`. Segments split into two halves; halves can later be merged
back. The routing function is range-based, not modulo-based.

This difference matters for partitioned-topic migration: the migrated
layout's initial segments line up 1:1 with the partitions, but their
hash-range routing wouldn't agree with v4's mod-N routing for all
keys. The fix is described in [Routing across
migration](#routing-across-migration) below.

---

## Goals

### In Scope

- A V5 SDK that operates against existing regular topics (partitioned
or non-partitioned) without requiring source changes from the
application.
- An admin command — `admin.scalableTopics().migrateToScalable(topic)`
— that converts an existing regular topic into a scalable topic in a
single atomic step, with no data copy and no cursor migration.
- A V5 SDK resolution rule that picks between the scalable code path
and a v4-compatible fallback based on the input topic name and
broker-side state, with a strict "once scalable, always scalable"
guarantee.
- A broker-side guard that prevents new v4 clients from accidentally
writing to (or auto-creating) a regular topic whose name has already
been claimed by a scalable topic.
- Preservation of per-key ordering across the migration moment, via
the same drain-before-assign protocol the controller uses for segment
splits: old partitions become sealed parent segments and the
subscription controller drains them before activating consumers on the
new children.
- Rejection of `non-persistent://` topics in V5 builders.
Non-persistent topics are out of scope for V5 entirely.

### Out of Scope

- **Cross-cluster (geo-replication) migration coordination.** A topic
migrated in one cluster while still being replicated to another is a
separate problem, deferred until geo-replication on scalable topics
lands ([PIP-460](pip-460.md) sub-PIP).
- **Reverse migration (scalable → regular).** Not supported; the
metadata transition is one-way.
- **Migration triggered by traffic or load thresholds.** Migration is
operator-initiated only.
- **Per-message format conversion.** Messages stay byte-identical on
disk before and after migration; only the topic-name surface above the
managed ledger changes.

---

## High-Level Design

### V5 SDK resolution rule

When a V5 application calls `client.newProducer(...).topic(input)` (or
any consumer builder), the SDK opens a **scalable-topic lookup
session** for the topic — the same long-lived push-based session that
PIP-468 defines for `topic://...` discovery. The lookup session is the
single source of truth for what the topic looks like, and how it
routes; there is no separate "probe" call and no client-side TTL
cache.

The broker responds to the lookup session based on the input form and
the topic's current state:

| Input form | Lookup response |
|---|---|
| `topic://t/n/x` | Real DAG layout (or `NotFound` if the scalable
topic doesn't exist — same as today). |
| `persistent://t/n/x` | If scalable metadata exists for the
equivalent `topic://t/n/x`: real DAG layout, with the topic identity
promoted to `topic://...` for all subsequent operations. Otherwise: a
**synthetic layout** that models the regular topic's partitions as
**special segments** (see [Special segments](#special-segments)
below). |
| `my-topic` (or any short / unscoped form) | Normalize to
`persistent://public/default/my-topic` then apply the rule above. |
| `non-persistent://...` | Reject at `create()` / `subscribe()` with
`UnsupportedOperationException`. V5 does not support non-persistent
topics. |

Because the lookup session is push-based, the broker can **update the
layout in place** when the topic is migrated. The V5 SDK already
handles layout changes — splits and merges go through the same
machinery — so a migration is observed as one more layout-change
event: synthetic layout → real DAG. The application sees its
`Producer<T>` / `Consumer<T>` keep working through the transition; the
SDK's existing reconnection logic handles the underlying connection
swap internally.

This gives the "once scalable, always scalable" guarantee (Goal 4) for
free: once the lookup session has reported a real DAG, future updates
can only refine the DAG via splits / merges; there is no "downgrade to
synthetic" path the broker exposes.

### Special segments

A special segment is the lookup-session encoding of "this slice of the
keyspace is not yet a real `segment://...` topic — it's the existing
`persistent://t/n/x[-partition-K]` topic instead." It carries the same
fields as a regular segment plus a marker that points at the
underlying `persistent://...` name.

The V5 SDK's per-segment producer and consumer infrastructure already
attaches to a topic name; the only difference for a special segment is
that the name uses the `persistent://` domain instead of `segment://`.
No separate code path, no separate wrapper class hierarchy.

For routing, the layout carries the routing function as data:

- **Synthetic layout for an N-partitioned regular topic**: N special
segments, one per partition; routing is
`signSafeMod(murmurHash3_32(key), N)` — exactly v4's
partitioned-producer routing. Producers route the same way the v4 SDK
would, ensuring per-key ordering during the migration window.
- **Synthetic layout for a non-partitioned regular topic**: 1 special
segment covering the full hash range; routing is trivial (no key
matters).
- **Real DAG (post-migration or natively scalable)**: range-based
routing — the standard scalable-topic semantics. Producers route by
hash range; per-key ordering across the migration boundary is
preserved by the controller's drain-before-assign protocol (see
[Routing across migration](#routing-across-migration) below).

### Migration protocol (operator's view)

```
Pre-migration (steady state):
  • Topic exists as persistent://t/n/x (with N partitions, possibly N=0)
  • Some clients are still on the v4 SDK; others have been upgraded to V5.
  • V5 clients see a SYNTHETIC layout from the lookup session: N special
    segments (or 1 for non-partitioned) pointing at the existing
    persistent://...-partition-K topics, with mod-N routing.

Step 1 — Operator upgrades all clients to the V5 SDK.
  Step 2 enforces this: the migration command inspects the topic's active
  producer/consumer connections and fails with HTTP 409 if any v4
  connections remain. Old code keeps working unchanged before migration
  because the synthetic layout exposes the existing persistent topics
  through the V5 surface; routing is mod-N so per-key destinations are
  identical to v4 partitioned-topic routing.

Step 2 — Operator runs:
    pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x

  The broker:
   2a. Validates that no v4 producer/consumer connections are attached to
       the topic. If any are, fails with HTTP 409 and the connection count
       in the error message. (Also fails 409 if scalable metadata already
       exists, and 404 if the topic doesn't exist.)
   2b. Builds ScalableTopicMetadata with:
        • N sealed parent segments (or 1 for non-partitioned), each
          wrapping the existing managed ledger for
          persistent://t/n/x-partition-K (or persistent://t/n/x for
          non-partitioned). The managed ledgers are unchanged; no data copy.
        • N active child segments (or 1) with equal-width contiguous hash
          ranges covering [0x0000, 0xFFFF], using standard range-based
          routing. Each child has all N parents as predecessors in the DAG.
       New writes route to children; subscriptions drain the parents
       before consuming from children — the same drain-before-assign
       protocol the controller already uses for segment splits / merges.
   2c. Atomically writes the metadata, taking the topic from
       "regular" to "scalable" in one CAS on the metadata store.
   2d. Pushes the new layout to every connected lookup session.

Step 3 — Connected V5 clients receive the layout-update push on their
  lookup session. Synthetic layout → real DAG. The SDK's existing
  layout-change handling (used for split / merge) drives any internal
  reconnection. Application-visible behaviour: nothing changes.
```

### Routing across migration

The synthetic layout (pre-migration) routes mod-N so that V5 producers
write to the same partitions v4 producers would. The post-migration
real DAG routes by hash range — the standard scalable-topic semantics.
The two routings are not equivalent: a given key's destination segment
can change at the migration moment.

Per-key ordering is preserved by the existing scalable-topic
drain-before-assign protocol, the same one the subscription controller
already uses for splits and merges:

- Each old partition becomes a sealed parent segment in the new DAG.
- N new active child segments are created alongside, with equal-width
contiguous hash ranges; every child has all N parents as predecessors.
- The subscription controller fully drains the parents before
assigning their children to a consumer. By the time a consumer first
sees a message from a child, every message previously published to
those parents (including any with the same key) has already been
delivered.

Producers route to children using standard range-based routing
immediately after the migration commit; their writes land in the new
segments while the parents are draining behind them. No special
routing flag is needed: the migration reuses the same machinery that
handles splits, with the parent fan-in (each child has N parents
instead of 1) being the only structural difference.

For non-partitioned topics (N=1) the same protocol applies trivially:
one sealed parent and one active child covering the full hash range.

---

## Detailed Design

### 1. V5 SDK changes

The SDK reuses its existing scalable code path — the lookup session,
per-segment producer / consumer infrastructure, layout-change handler
— for *every* topic, including regular ones. The only new bits are:
how the lookup session is opened from `persistent://` / short-form
input, how the SDK interprets a "special segment" entry in the layout,
and how it carries the routing function carried by the layout.

#### 1.1 Lookup session opens for any input form

`PulsarClientV5` opens a lookup session for the user's topic
regardless of domain. The existing scalable-topic lookup-session
machinery is extended so the broker accepts `persistent://...` and
short-form names in addition to `topic://...`. The session response
carries:

- The promoted topic identity (always `topic://t/n/x` after normalization).
- The current layout — either a real DAG or a synthetic layout (see
[Special segments](#special-segments) above).
- The routing function (`mod-N` for the synthetic layout,
`range-based` for the real DAG).

`non-persistent://...` inputs are rejected at the V5 builder before
the lookup is opened, with `UnsupportedOperationException`.

#### 1.2 Special-segment handling in the per-segment infrastructure

A regular `Segment` carries a `segment://...` URI. A special segment
carries a `persistent://...` URI instead, plus a flag indicating it's
special. The SDK's per-segment producer (`PerSegmentProducer`) and
per-segment consumer (`PerSegmentConsumer`) attach to whatever URI the
segment carries — the v4 producer / consumer beneath them already
accepts `persistent://` and `segment://` alike. No separate adapter
classes are introduced.

V5-specific features that don't apply on a regular topic surface as
ordinary "this layout doesn't support that" errors at the API surface:

- `CheckpointConsumer` requires a sealed-segment history; on a
synthetic layout there is none, so subscribe fails with a clear error
pointing at the migration command.
- `splitSegment` / `mergeSegments` admin operations on a synthetic
layout fail at the broker with the same error class — the operations
require real `ScalableTopicMetadata`.
- `topic://...`-domain DLQ targets work transparently for both layouts
because the DLQ is its own scalable topic; nothing about it depends on
the source topic's layout shape.

#### 1.3 Layout-change handling drives the migration transition

Layout updates pushed by the lookup session already trigger the SDK's
per-segment reconcile: segments that disappeared get their per-segment
producers / consumers torn down; new segments get fresh ones; segments
whose URI changed get rebuilt on the new URI.

A migration is exactly this: the special segment with URI
`persistent://t/n/x-partition-K` is replaced in the new layout with a
real segment whose URI is `segment://t/n/x/<descriptor>` — and that
`segment://...` resolves to the same managed ledger. The reconciler
tears down the per-segment v4 producer/consumer attached to the
`persistent://` URI and reattaches one to the `segment://` URI; from
the application's perspective the SDK's internal reconnect happens (as
it does for any layout change) but the `Producer<T>` / `Consumer<T>`
reference and the publish/receive flow are unaffected.

There is no `TopicMigratedException` exposed to the application by
default. Applications that *want* to observe migrations can subscribe
to the lookup session's layout-change events directly via a future
hook — out of scope for this PIP.

### 2. Migration command

#### 2.1 Admin REST endpoint

`POST 
/admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable`
— requires `produce` permission on the topic (the same permission
needed to write to it in the first place). Migration is irreversible
but does not destroy data, so the blast radius is bounded by what a
write-permissioned user can already do; super-user is not required.

No request body.

The broker counts v4 producer / consumer connections via the existing
topic-stats path and rejects the migration with HTTP 409 if any are
present, with the count in the error message.

Response: `204 No Content` on success. `409 Conflict` if scalable
metadata already exists or v4 connections are still attached. `404` if
the topic doesn't exist.

#### 2.2 Admin client

```java
package org.apache.pulsar.client.admin;

public interface ScalableTopics {
    /** Migrate an existing partitioned or non-partitioned topic to a
scalable topic. */
    void migrateToScalable(String topic) throws PulsarAdminException;
    CompletableFuture<Void> migrateToScalableAsync(String topic);
}
```

#### 2.3 CLI

```
pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x
```

#### 2.4 Broker-side migration steps

Executed by the topic's owning broker, atomically as far as possible:

1. **Lock**: acquire a metadata-store lock on `/topics/t/n/x` to
prevent concurrent migrations or competing admin operations.
2. **Precheck**:
   - Topic exists (as either partitioned or non-partitioned).
   - No `ScalableTopicMetadata` already exists at the same path. If it
does, fail 409.
   - No v4 producer / consumer connections are attached. If any are,
fail 409 with the count in the error message.
3. **Build initial layout**:
   - N sealed parent segments (or 1 for non-partitioned), each
wrapping the existing managed ledger for
`persistent://t/n/x-partition-K` (or `persistent://t/n/x`).
   - N active child segments (or 1 for non-partitioned) with
equal-width contiguous hash ranges covering `[0x0000, 0xFFFF]`.
Routing is range-based.
   - DAG edges: every child has all N parents as predecessors. The
subscription controller's drain-before-assign protocol (already used
for splits / merges) preserves per-key ordering.
   - Set `epoch = 0`, `nextSegmentId = 2N`.
4. **Atomic flip**: write `ScalableTopicMetadata` to `/topics/t/n/x`
via metadata-store CAS. This is the commit point — once it succeeds,
the topic is scalable.
5. **Push the new layout to every connected lookup session.** V5
clients that were seeing the synthetic layout for this topic
transition to the real DAG via the same machinery used for split /
merge layout updates. No `TerminateTopic` is needed — the underlying
managed ledgers don't change identity, only the layout-level segment
names that wrap them do.
6. **Release the lock**.

Failures before step 4 leave the topic untouched; failures after step
4 leave the topic scalable. Step 5 is best-effort per session —
late-joining lookups always read the freshest metadata directly.

### 3. Lookup-session guard for v4 clients

The lookup session described above is V5-only. v4 clients use the
older `CommandLookupTopic`. The broker must, for v4 lookups of
`persistent://t/n/x` where `ScalableTopicMetadata` exists for
`topic://t/n/x`, return a `TopicMigrated` redirect (binary protocol) /
HTTP 410 Gone (REST). The error carries the new `topic://...` name. v4
clients translate this into a hard failure (they can't speak the
scalable protocol); operators see a clear signal that some clients
haven't upgraded yet.

This guard is what makes the "once scalable, always scalable"
guarantee robust against stale v4 clients or v4-only tooling. V5
clients are guarded by the lookup session itself — once it has
reported a real DAG, the broker only ever pushes refinements (split /
merge), never a downgrade.

### 4. `migratedFrom` on `ScalableTopicMetadata`

An optional informational field `migratedFrom: TopicMigrationOrigin?`
records pre-migration metadata (partition count, original
`persistent://` name) for debugging, metrics labelling, and future
tooling. Not consulted on hot paths.

No `legacyModNRouting` flag or other migration-specific routing knob
is needed: the post-migration real DAG uses standard range-based
routing and per-key ordering is preserved by the controller's existing
drain-before-assign protocol (see [Routing across
migration](#routing-across-migration)).

---

## Public-facing changes

### REST API

New endpoint:
- `POST 
/admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable`
— requires `produce` permission on the topic; no request body; 204 on
success, 409 if already scalable or v4 connections are still attached,
404 if topic missing.

Modified endpoints:
- All v4 lookup endpoints: when scalable metadata exists for the
equivalent `topic://...`, return a `TopicMigrated` redirect with the
new name.

### Binary protocol

- The existing scalable-topic lookup-session command (PIP-468) is
extended to accept `persistent://...` and short-form names in addition
to `topic://...`. For non-scalable topics it returns a synthetic
layout with special-segment entries.
- New error code `TopicMigrated` — sent by the broker on the v4 lookup
command (`CommandLookupTopic`) when the requested `persistent://...`
name is shadowed by an existing scalable topic. Carries the new
`topic://...` name in the error payload. V5 clients never see this
error because they always use the scalable lookup session.

### Configuration

New broker config:
- `enableScalableTopicMigration` (default `true`) — kill switch for
the migration command. Operators on regulated infra may want to
disable.

### CLI

- `pulsar-admin scalable-topics migrate-to-scalable <topic>`

### V5 SDK behavior

- V5 builders accept `persistent://...` and short-form names; the SDK
opens a scalable-topic lookup session that the broker resolves to
either a real DAG or a synthetic layout.
- V5 builders reject `non-persistent://...` with
`UnsupportedOperationException`.
- Migration is observed as a layout-change push on the lookup session;
no new SDK exception is exposed to the application.

---

## Backward & forward compatibility

### Upgrade

The upgrade story this PIP supports is:

1. Upgrade brokers to the version containing this PIP. Brokers handle
both old `persistent://` clients and new `topic://` clients side by
side; no behavior change for existing topics.
2. Upgrade applications to the V5 SDK at the operator's pace. No topic
changes required; V5 SDK uses the wrapper path.
3. Once all applications on a given topic are V5, run
`migrate-to-scalable`. The migration is atomic and one-way.

Old client versions continue to work with the cluster on un-migrated topics.

### Downgrade / Rollback

- A broker downgrade *before* any topic has been migrated is fully supported.
- A broker downgrade *after* a topic has been migrated is **not
supported**: older brokers don't understand the scalable-metadata
layout. Recovery would require restoring the metadata store from
backup. The migration command should be treated as a one-way commit.
- A V5 SDK client can be downgraded back to v4 only if the topic was
never migrated. Once migrated, the topic only speaks the scalable
protocol.

### Geo-replication

Out of scope; see [Goals: Out of Scope](#out-of-scope).

---

## Alternatives

### Alt 1: Probe-and-wrapper in the SDK (rejected)

An earlier draft of this PIP had the V5 SDK probe via
`admin.scalableTopics().getMetadataAsync(topic)` at every builder
call, cache the verdict with a TTL, and use a separate v4-wrapper code
path for regular topics. Migration would be observed by connected
clients as `TopicTerminatedException` from the v4 wrapper, which the
SDK would catch and use as the cue to re-probe and rebuild on the
scalable path.

Rejected in favour of the lookup-session approach because:
- Two SDK code paths (scalable + v4 wrapper) would have to be
maintained and tested in parallel.
- The lookup session is push-based; the probe approach forces a TTL
trade-off (long TTL = slow to notice migration; short TTL = constant
load).
- Migration was visible to applications as a `TopicMigratedException`
on receive futures, however brief; the lookup-session approach makes
it strictly an internal SDK reconnect, with no application-visible
signal at all.
- The hash-routing equivalence problem ([Routing across
migration](#routing-across-migration)) had to be papered over with a
documented constraint; with the lookup session carrying the routing
function as data, the synthetic layout's mod-N routing and the real
DAG's range-based routing coexist naturally, with per-key ordering
preserved by the controller's drain-before-assign protocol.

### Alt 2: Migration via per-message data copy

The migration command would create a new scalable topic alongside the
regular one and stream all retained data through. Producers and
consumers would cut over after the copy completes.

Rejected because:
- The metadata-flip approach in this PIP achieves the same result with
no data movement.
- A retained topic with months of data could take hours to copy,
during which producers and consumers would have no clear cut-over
moment.

---

## General Notes

- `migratedFrom` is an optional informational field on
`ScalableTopicMetadata` (partition count, original `persistent://`
name) for debugging and metrics labelling; not consulted on hot paths.

---

## Links

- [PIP-460: Scalable Topics](pip-460.md) — parent PIP.
- [PIP-468: Scalable Topic Controller](pip-468.md) — sibling sub-PIP,
defines the metadata-store schema this PIP extends.


--
Matteo Merli
<[email protected]>

Reply via email to