void-ptr974 commented on code in PR #25774:
URL: https://github.com/apache/pulsar/pull/25774#discussion_r3303502497


##########
pip/pip-477.md:
##########
@@ -0,0 +1,991 @@
+# PIP-477: Dynamic Bookie Cluster Switching for Brokers
+
+> **Revision 2 (Plugin-First Architecture)** — This revision reframes PIP-477
+> per the design feedback to build on top of the storage-layer pluggability
+> introduced by [PIP-384](./pip-384.md), the per-ledger metadata extension slot
+> introduced by [PIP-404](./pip-404.md), the `CustomCommandFactory` SPI 
introduced
+> by [PIP-201](./pip-201.md), and the existing `AdditionalServlet` SPI — 
instead
+> of adding a self-contained subsystem to Pulsar core.
+
+## Motivation
+
+Apache Pulsar's broker layer is bound to a fixed BookKeeper (Bookie) cluster
+at startup through the `bookkeeperMetadataServiceUri` configuration. This
+static binding becomes a bottleneck in several real-world operational
+scenarios:
+
+1. **Slow failure recovery** — When a Bookie cluster experiences capacity
+   exhaustion, sustained latency degradation, or non-recoverable failures, the
+   only mitigation today is to wait for the Bookie cluster to recover. In
+   elastic-storage deployments where one Bookie cluster backs 100+ broker
+   clusters, the blast radius is enormous.
+2. **No capacity escape hatch** — When a Bookie cluster approaches its
+   capacity ceiling, brokers cannot "spill over" to a fresh Bookie cluster.
+3. **No runtime re-targeting** — There is no supported runtime mechanism to
+   migrate brokers from one Bookie cluster to another without restart and full
+   data migration.
+4. **Data plane coupling** — Switching deals with the data plane: terabytes of
+   ledger bytes, ongoing publishes, ongoing acknowledgments, and live cursors.
+   A copy-everything-then-cutover approach is infeasible.
+
+This PIP proposes a **safe, ledger-attribution-driven framework** that allows
+operators to switch a broker cluster's underlying Bookie cluster at runtime,
+with **zero data-plane downtime** and **per-ledger routing** that preserves the
+readability of historical data on the old cluster.
+
+The framework enables:
+
+- **Zero-downtime switching** — publish and consume continue without 
interruption
+- **Per-ledger routing** — each ledger carries its own cluster attribution
+- **No bulk data migration** — Topic Data Ledgers age out naturally via 
TTL/Retention
+- **Targeted migration of small, long-lived metadata ledgers** — only Schema 
and
+  Cursor Ledgers are copied, preserving the original `ledgerId`
+- **Automatic rollback** within a configurable window
+- **Operator-driven, not auto-failover** — avoids split-brain
+
+## Design Principle: Build on PIP-384, Not Around It
+
+The feature is delivered as **a `ManagedLedgerStorage` implementation that
+holds multiple `BookkeeperManagedLedgerStorageClass` instances**, packaged as
+an out-of-tree NAR plugin. The framing PIP-384 establishes —
+*"a topic resolves to one storage class"* — is generalized here to:
+
+> *"A topic resolves to one **active** storage class at a time, with
+> **attribution-driven routing for previously-written ledgers**."*
+
+Concretely, this PIP:
+
+- Lives in a separate Maven module (`pulsar-bookie-cluster-switching`) and
+  ships as a NAR; the broker loads it via the existing 
`managedLedgerStorageClassName`
+  configuration hook (PIP-384).
+- Re-uses **`LedgerInfo.properties`** (PIP-404, tag 6) and
+  **`ManagedCursorInfo.cursorProperties`** (existing, tag 8) for per-ledger
+  cluster attribution. **Zero new proto fields** in `MLDataFormats.proto`.
+- Ships its REST surface via **`AdditionalServlet`**; ships its CLI via
+  **`CustomCommandFactory` NAR** (PIP-201). **Zero changes** to `pulsar-broker`
+  REST classes or `pulsar-admin` CLI core.
+- Requires only **three narrow SPI hooks** in core (Section "Required Core SPI
+  Additions"), each useful to *any* custom `ManagedLedgerStorage` author and
+  much smaller than inlining the full feature.
+- When the plugin NAR is not installed, broker behavior is **byte-for-byte
+  identical** to unmodified Pulsar; the ~99% of users that never switch BK
+  clusters pay zero cost in core surface area.
+
+## Goal
+
+Provide a runtime mechanism by which a broker cluster can be re-pointed from
+one Bookie cluster (`oldCluster`) to a new Bookie cluster (`newCluster`)
+without service interruption, while preserving the ability to read historical
+data that physically resides on `oldCluster`.
+
+### In Scope
+
+- A plugin-provided `ManagedLedgerStorage` implementation
+  (`MultiClusterManagedLedgerStorage`) that holds a
+  `Map<clusterName, BookkeeperManagedLedgerStorageClass>` and exposes the
+  currently active one.
+- Per-ledger cluster attribution stored in the **existing**
+  `LedgerInfo.properties` and `ManagedCursorInfo.cursorProperties` slots under
+  the reserved key `_pulsar.bookieClusterName`.
+- Per-broker-cluster registry of multiple Bookie clusters, stored in
+  Broker-ZK under a plugin-owned path.
+- A plugin-owned `BookieClusterReadRouter` that selects a BK client at
+  ledger-handle open time based on the attribution property.
+- A plugin-owned `BookieClusterSwitchOrchestrator` that drives
+  `BUILD → PROMOTE → CLEANUP` for Schema and Cursor Ledgers (migration
+  preserves the original `ledgerId` via `BookKeeper.asyncCreateLedgerAdv`).
+- Plugin-provided CLI (`pulsar-admin bookie-clusters …`) via PIP-201
+  `CustomCommandFactory`.
+- Plugin-provided REST API under `/admin/v2/bookie-clusters/*` via
+  `AdditionalServlet`.
+- Three narrow core SPI hooks (see Section "Required Core SPI Additions").
+
+### Out of Scope
+
+- **Topic Data Ledger migration** — not migrated; ages out via TTL/Retention.
+- **Automatic failure detection and switch triggering** — operator-initiated
+  to avoid split-brain.
+- **Cross-cluster (geo-replicated) coordination** — each broker cluster
+  switches independently.
+- **BookKeeper-cluster-side internal migration tools** — delegated to 
BookKeeper.
+- **Changing `metadataStoreUrl` (Broker-ZK)** — only the Bookie cluster is
+  switched; Broker-ZK remains the same throughout.
+- **New proto fields in `MLDataFormats.proto`** — reuses existing extension
+  slots from PIP-404 and `cursorProperties`.
+
+## High-Level Architecture
+
+```
+┌────────────────────────────────────────────────────────────────────────┐
+│ Pulsar core (unchanged + 3 narrow SPI hooks)                           │
+│                                                                         │
+│   ManagedLedgerStorage SPI            ─── PIP-384                       │
+│   ManagedLedgerStorageClass SPI       ─── PIP-384                       │
+│   LedgerInfo.properties (KV)          ─── PIP-404                       │
+│   ManagedCursorInfo.cursorProperties  ─── pre-existing                  │
+│   AdditionalServlet SPI               ─── pre-existing                  │
+│   CustomCommandFactory SPI            ─── PIP-201                       │
+│                                                                         │
+│   + ManagedLedgerConfig.activeBookKeeperSupplier         ←── NEW (S1)   │
+│   + ManagedLedgerConfig.bookKeeperResolver               ←── NEW (S2)   │
+│   + SchemaStorageBookKeeperProvider SPI                  ←── NEW (S3)   │
+└────────────────────────────┬───────────────────────────────────────────┘
+                             │ SPI / NAR loading
+┌────────────────────────────▼───────────────────────────────────────────┐
+│ pulsar-bookie-cluster-switching (NAR, opt-in)                          │
+│                                                                         │
+│   MultiClusterManagedLedgerStorage   impl ManagedLedgerStorage         │
+│     ├─ Map<name, BookkeeperManagedLedgerStorageClass>                  │
+│     ├─ activeClusterName (volatile)                                     │
+│     └─ getDefaultStorageClass() → active                               │
+│                                                                         │
+│   RoutingManagedLedgerFactory        wraps ManagedLedgerFactoryImpl    │
+│     └─ Per-ledger BK resolution from LedgerInfo.properties             │
+│                                                                         │
+│   RoutingSchemaBookKeeperProvider    impl SchemaStorageBookKeeperProvider│
+│     └─ Per-position BK resolution from SchemaLocator extension         │
+│                                                                         │
+│   BookieClusterConfigManager / ConfigWatcher                            │
+│     └─ Owns ZK path /admin/bookie-clusters/* (plugin-owned)            │
+│                                                                         │
+│   BookieClusterSwitchOrchestrator    BUILD → PROMOTE → CLEANUP          │
+│                                                                         │
+│   REST  /admin/v2/bookie-clusters/*  via AdditionalServlet              │
+│   CLI   pulsar-admin bookie-clusters via CustomCommandFactory NAR       │
+└────────────────────────────────────────────────────────────────────────┘
+```
+
+### Key Principles
+
+| Principle | Description |
+|-----------|-------------|
+| **Attribution as Single Source of Truth** | Every ledger at creation is 
"stamped" with the active cluster name inside its existing metadata properties. 
Reads are routed by that stamp. No history table, no boundary heuristics, no 
side znode. |
+| **Reuse Extension Slots, Don't Add Proto Fields** | `LedgerInfo.properties` 
(PIP-404) and `ManagedCursorInfo.cursorProperties` carry the attribution. Only 
`SchemaStorageFormat.proto` needs one tiny extension (Section "Required Core 
SPI Additions" S3). |
+| **Topic Data: Don't Migrate** | Topic Data Ledgers have TTL/Retention; let 
them age out. Switching does not move terabytes of data. |
+| **Schema / Cursor: Migrate, but Preserve `ledgerId`** | We use 
`asyncCreateLedgerAdv(ledgerId, …)` to recreate the **same `ledgerId`** in the 
new cluster, copy entries with **identical `entryId`**, then CAS only the 
attribution property. Business keys (`cursorsLedgerId`, `position.ledgerId`, 
`position.entryId`) are byte-for-byte unchanged in Broker-ZK. |
+| **Plugin-First, Opt-In** | When the NAR is absent or 
`managedLedgerStorageClassName` ≠ the multi-cluster class, the broker is 
byte-for-byte unmodified Pulsar. |
+| **Operator-Driven, Single Active Cluster** | At any moment exactly one 
cluster is `status=ACTIVE` in Broker-ZK. Auto-failover is rejected. |
+| **Broker-ZK is the Switch Source-of-Truth** | All cluster registrations and 
the switch directive live in Broker-ZK under a plugin-owned path. No Global-ZK, 
no external coordinator. |
+
+## Required Core SPI Additions
+
+These are the **only** changes to Pulsar core required by this PIP. Each is a
+narrow, generally-useful hook that any custom `ManagedLedgerStorage`
+implementation could leverage — not specific to BK switching.
+
+### S1. `ManagedLedgerConfig.activeBookKeeperSupplier`
+
+**Problem.** Today `ManagedLedgerImpl` captures the `BookKeeper` client once
+in its constructor (via `ManagedLedgerFactoryImpl.bookkeeperFactory`). When a
+custom storage class needs to hot-swap the underlying BK client (e.g. cluster
+switch), there is no clean injection point on the write path.
+
+**Hook.** Add an optional supplier:
+
+```java
+// managed-ledger/.../ManagedLedgerConfig.java
+private Supplier<BookKeeper> activeBookKeeperSupplier;   // nullable
+
+public Supplier<BookKeeper> getActiveBookKeeperSupplier() { … }
+public ManagedLedgerConfig setActiveBookKeeperSupplier(Supplier<BookKeeper> s) 
{ … }
+```
+
+`ManagedLedgerImpl.createLedgerAfterClosed`, `rollCurrentLedgerIfFull`, and
+`ManagedCursorImpl.doCreateNewMetadataLedger` resolve the BK client as:
+
+```java
+BookKeeper bk = config.getActiveBookKeeperSupplier() != null
+        ? config.getActiveBookKeeperSupplier().get()
+        : this.bookKeeper;     // backward-compatible default
+```
+
+**Scope.** Pure additive. When the supplier is unset (default), behavior is
+identical. ~30 lines of change in `managed-ledger/`.
+
+### S2. `ManagedLedgerConfig.bookKeeperResolver`
+
+**Problem.** Reads (and deletes) need a *per-ledger* resolution path. Today
+`ManagedLedgerImpl` uses the constructor-captured `bookKeeper` for every
+`asyncOpenLedger`/`asyncDeleteLedger`. After a switch, ledgers that physically
+live on the old cluster must be opened with the old client.
+
+**Hook.** Add an optional resolver function keyed on `LedgerInfo`:
+
+```java
+// managed-ledger/.../ManagedLedgerConfig.java
+private Function<LedgerInfo, BookKeeper> bookKeeperResolver;   // nullable
+
+// And the analogous function for cursor metadata ledgers:
+private Function<ManagedCursorInfo, BookKeeper> cursorBookKeeperResolver;
+```
+
+All call sites that today reference `this.bookKeeper` to open or delete a
+specific ledger (the comprehensive list is in
+`ManagedLedgerImpl.getLedgerHandle`, `internalAsyncOpenCursor`,
+`asyncDeleteLedger`, `asyncDeleteFromBookKeeper`, …) become:
+
+```java
+BookKeeper bk = resolveBookKeeperForLedger(ledgerInfo);
+```
+
+Where:
+
+```java
+private BookKeeper resolveBookKeeperForLedger(LedgerInfo li) {
+    var resolver = config.getBookKeeperResolver();
+    return resolver != null ? resolver.apply(li) : this.bookKeeper;
+}
+```
+
+**Scope.** Pure additive; when resolvers are unset, behavior is identical.
+~80 lines of change in `managed-ledger/`, mostly mechanical substitutions.
+
+### S3. `SchemaStorageBookKeeperProvider` SPI
+
+**Problem.** `BookkeeperSchemaStorage` captures `this.bookKeeper` at
+`start()` time (line ~101). It directly opens schema ledgers via that single
+client. To support per-ledger routing we need to inject a resolver and to
+attribute each schema ledger to a cluster.
+
+**Hook (two parts).**
+
+1. **Add a tiny extension slot to `SchemaStorageFormat.proto`** — this is the
+   *only* proto change in this PIP:
+
+   ```protobuf
+   message PositionInfo {
+       required int64 ledgerId = 1;
+       required int64 entryId = 2;
+       repeated KeyValue properties = 3;   // ★ NEW — generic extension slot,
+                                           // mirrors PIP-404 
LedgerInfo.properties
+   }
+
+   message KeyValue {
+       required string key = 1;
+       required string value = 2;
+   }
+   ```
+
+   This is the minimal possible change: a generic `properties` slot symmetric
+   to PIP-404. It does **not** mention bookie clusters; any future plugin can
+   use it.
+
+2. **Add a provider SPI for the schema storage BK client:**
+
+   ```java
+   // pulsar-broker/.../service/schema/SchemaStorageBookKeeperProvider.java
+   public interface SchemaStorageBookKeeperProvider {
+       /** Returns the BK client to use for the given schema position. */
+       BookKeeper resolve(PositionInfo position);
+       /** Returns the BK client to use for *creating* a new schema ledger. */
+       BookKeeper active();
+   }
+   ```
+
+   `BookkeeperSchemaStorage` resolves its provider as:
+
+   ```java
+   var provider = pulsar.getSchemaStorageBookKeeperProvider();   // nullable
+   BookKeeper bk = provider != null ? provider.resolve(position) : 
this.bookKeeper;
+   ```
+
+   The default (provider == null) preserves today's behavior exactly.
+
+**Scope.** One proto field (additive, `repeated` is forward/backward
+compatible in proto2) + one SPI interface + ~50 lines in
+`BookkeeperSchemaStorage`. No new REST endpoint, no CLI change.
+
+### Summary: Core Diff Footprint
+
+| Component | LOC | Risk |
+|-----------|-----|------|
+| `ManagedLedgerConfig` (S1, S2 setters/getters) | ~40 | Trivial additive |
+| `ManagedLedgerImpl` / `ManagedCursorImpl` resolver wiring | ~80 | 
Mechanical, well-scoped |
+| `SchemaStorageFormat.proto` `PositionInfo.properties` (S3) | 1 field | 
Additive, proto2-safe |
+| `SchemaStorageBookKeeperProvider` SPI + `BookkeeperSchemaStorage` wiring | 
~50 | Additive |
+| **Total core diff** | **~170 LOC, 1 new optional proto field** | **Zero 
behavioral change when SPI unused** |
+
+Everything else lives in the plugin NAR.
+
+## Plugin Module: `pulsar-bookie-cluster-switching`
+
+A new top-level Maven module that builds a NAR 
(`pulsar-bookie-cluster-switching-<version>.nar`).
+
+### Module Layout
+
+```
+pulsar-bookie-cluster-switching/
+├─ src/main/java/.../
+│   ├─ storage/
+│   │   ├─ MultiClusterManagedLedgerStorage.java    (implements 
ManagedLedgerStorage)
+│   │   ├─ MultiClusterStorageClass.java            (implements 
BookkeeperManagedLedgerStorageClass)
+│   │   └─ RoutingManagedLedgerFactory.java
+│   ├─ schema/
+│   │   └─ RoutingSchemaBookKeeperProvider.java     (implements 
SchemaStorageBookKeeperProvider)
+│   ├─ registry/
+│   │   ├─ BookieClusterConfigManager.java
+│   │   └─ BookieClusterConfigWatcher.java
+│   ├─ orchestrator/
+│   │   ├─ BookieClusterSwitchOrchestrator.java
+│   │   ├─ LedgerCopyUtil.java
+│   │   └─ IdgenPrecheckService.java
+│   ├─ rest/
+│   │   └─ BookieClusterAdminServlet.java           (implements 
AdditionalServlet)
+│   └─ cli/
+│       └─ BookieClusterCommandFactory.java         (implements 
CustomCommandFactory)
+└─ src/main/resources/META-INF/services/
+    ├─ org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlet
+    └─ org.apache.pulsar.admin.cli.extensions.CustomCommandFactory
+```
+
+### Wiring on Broker Start
+
+The plugin is activated by setting in `broker.conf`:
+
+```properties
+managedLedgerStorageClassName=org.apache.pulsar.ext.bookieswitch.MultiClusterManagedLedgerStorage
+additionalServlets=bookie-clusters-admin
+additionalServletDirectory=./plugins
+```
+
+Then `ManagedLedgerStorage.create()` (PIP-384 entry point) reflectively loads
+`MultiClusterManagedLedgerStorage`. Inside its `initialize()`:
+
+1. Reads its own configuration namespace (prefix `bookieClusterSwitch.*`).
+2. Reads `/admin/bookie-clusters/*` from Broker-ZK to discover registered
+   clusters and resolve the initial `activeClusterName` (tier-1 →
+   Broker-ZK `status=ACTIVE` cluster; tier-2 → optional
+   `bookieClusterSwitch.currentClusterName` hint; tier-3 → derive from
+   `bookkeeperMetadataServiceUri` and auto-register as `ACTIVE`).
+3. Builds one `BookkeeperManagedLedgerStorageClass` per registered cluster
+   (each owns its own `BookKeeper` client and `StatsProvider`).
+4. Constructs a `RoutingManagedLedgerFactory` that decorates a base
+   `ManagedLedgerFactoryImpl` and injects the per-ledger BK resolver via the
+   S1/S2 hooks above.
+5. Starts `BookieClusterConfigWatcher` to receive switch directives.
+6. (Origin-broker-only after a switch) starts the orchestrator.
+
+The plugin's `AdditionalServlet` instance is independently discovered by
+`AdditionalServlets.load(…)` in `WebService`; the plugin's
+`CustomCommandFactory` is independently discovered by `pulsar-admin`'s NAR
+class loader.
+
+## Detailed Design
+
+### Per-Ledger Attribution Encoding
+
+| Ledger type | proto container | Encoding | Key |
+|-------------|-----------------|----------|-----|
+| Topic Data | `ManagedLedgerInfo.LedgerInfo.properties` (PIP-404, tag 6) | 
`KeyValue` | `_pulsar.bookieClusterName` |
+| Cursor | `ManagedCursorInfo.cursorProperties` (existing, tag 8) | 
`StringProperty` | `_pulsar.bookieClusterName` |
+| Schema | `SchemaStorageFormat.PositionInfo.properties` (NEW, tag 3, S3) | 
`KeyValue` | `_pulsar.bookieClusterName` |
+
+The reserved key prefix `_pulsar.` is **owned by Pulsar core** to avoid
+collisions with plugin- or user-defined properties. Plugin code uses a
+constant:
+
+```java
+public static final String ATTR_KEY = "_pulsar.bookieClusterName";
+```
+
+**Compatibility.**
+
+- **proto2 forward-compat:** `repeated KeyValue properties` already exists in
+  `LedgerInfo` (PIP-404). For `PositionInfo` it's added with a new tag (3),
+  matching the symmetric pattern.
+- **Old broker reading new metadata:** ignores `properties` it doesn't
+  understand; preserves them via `UnknownFieldSet` on rewrite (already the
+  case for PIP-404).
+- **New broker reading old metadata (no attribution):** the plugin's resolver
+  falls back to the active cluster (which equals the original cluster for
+  pre-switch ledgers; correct).
+
+### Write-Path Stamping
+
+`RoutingManagedLedgerFactory` wraps the base factory. When `ManagedLedger`
+asks for a new ledger:
+
+```java
+// Pseudocode of the stamping path
+String activeCluster = multiClusterStorage.getActiveClusterName();
+BookKeeper bk = multiClusterStorage.getBookKeeper(activeCluster);    // via S1
+LedgerHandle lh = bk.asyncCreateLedger(...);
+
+// In the same metadata CAS that records the new LedgerInfo:
+LedgerInfo info = baseInfo.toBuilder()
+    .addProperties(KeyValue.newBuilder()
+        .setKey(ATTR_KEY)
+        .setValue(activeCluster)
+        .build())
+    .build();
+```
+
+The same pattern applies to:
+
+- `ManagedCursorImpl.doCreateNewMetadataLedger` → stamps
+  `cursorProperties[_pulsar.bookieClusterName] = activeCluster`.
+- `BookkeeperSchemaStorage.createLedger` (via S3 provider) → stamps
+  `PositionInfo.properties[_pulsar.bookieClusterName] = activeCluster`.
+
+The stamp is written in the **same Broker-ZK CAS** as the business field
+(`ledgerId`, `cursorsLedgerId`, `position`), guaranteeing atomicity
+(invariant **I1**).
+
+### Read-Path Routing
+
+```java
+// RoutingManagedLedgerFactory -- injected as 
ManagedLedgerConfig.bookKeeperResolver (S2)
+BookKeeper resolve(LedgerInfo li) {
+    String cluster = li.getPropertiesList().stream()
+        .filter(kv -> ATTR_KEY.equals(kv.getKey()))
+        .map(KeyValue::getValue)
+        .findFirst()
+        .orElseGet(multiClusterStorage::getActiveClusterName);   // legacy 
fallback
+    return multiClusterStorage.getBookKeeper(cluster);
+}
+```
+
+Identical pattern for cursors (consulting `cursorProperties`) and schema
+positions (consulting `PositionInfo.properties`).
+
+**Invariant I4** (entry-level consistency within a single read): the resolver
+is consulted exactly once at `LedgerHandle` open time; the resulting
+`LedgerHandle` is bound to one BK client for the entire read of that ledger.
+
+### Switch Phases (Operator-Driven State Machine)
+
+```
+NOT_REGISTERED
+    ↓
+[Operator: pulsar-admin bookie-clusters register --name newCluster --uri … 
--status STANDBY]
+    ↓
+REGISTERED (STANDBY)
+    ↓
+[Operator: pulsar-admin bookie-clusters switch --target newCluster]
+    ↓ (inline precheck: idgen-long advanced)
+SWITCH_TRIGGERED ← Coordinator broker writes 
/admin/bookie-clusters/switch-target
+    ↓
+LIVE_DUAL_READ  ← All brokers' watchers fire; new writes stamped newCluster;
+    ↓             reads of pre-stamped ledgers routed by attribution
+BUILD_PROMOTE_CLEANUP
+    ├── BUILD     ← Coordinator copies Schema/Cursor ledger bytes to newCluster
+    │             with the SAME ledgerId; entries copied with the SAME entryId
+    ├── PROMOTE   ← Schema: coordinator CAS-rewrites SchemaLocator
+    │             (changes only PositionInfo.properties[ATTR_KEY])
+    │             Cursor: coordinator forwards POST /internal/promote-cursor
+    │             to topic owner; owner CAS in ManagedCursorImpl lock
+    └── CLEANUP   ← After rollback window, delete old-cluster copies; promote
+                  Broker-ZK status: oldCluster→DEPRECATED, newCluster→ACTIVE
+    ↓
+DONE / DONE_WITH_FAILURES
+
+(Operator-triggered, within window):
+ROLLBACK ← Reverse-CAS attribution back to oldCluster; new-cluster
+          copies scheduled for delayed deletion.
+```
+
+All states above live **inside the plugin**. The orchestrator persists its
+progress under `/admin/bookie-clusters/orchestrator/<switchId>/...`.
+
+### Phase 1: Registration
+
+```bash
+pulsar-admin bookie-clusters register \
+    --name new-cluster \
+    --metadata-service-uri zk+null://new-zk:2181/ledgers-v2 \
+    --status STANDBY
+```
+
+`BookieClusterConfigManager` (plugin) validates:
+
+- `metadata-service-uri` does not collide with any already-registered cluster.
+- For co-located deployments, the chroot must differ from existing clusters.
+
+### Phase 2: Precheck (Invariant I11)
+
+```bash
+pulsar-admin bookie-clusters precheck --name new-cluster
+```
+
+`IdgenPrecheckService` (plugin) verifies the new cluster's 
`/ledgers/idgen-long`
+is advanced beyond `max(ledgerId)` of the source cluster. Otherwise
+`asyncCreateLedgerAdv(ledgerId, …)` during BUILD would collide with
+newly-allocated ledgerIds. `POST /switch` invokes precheck inline; failure
+returns HTTP 409 (bypassable via plugin config in lab/staging only).
+
+### Phase 3: Switch Trigger
+
+```bash
+pulsar-admin bookie-clusters switch --target new-cluster
+```
+
+The receiving broker (origin) writes:
+
+```
+/admin/bookie-clusters/switch-target
+  body: { "target": "new-cluster", "origin": "https://broker-1:8080";,
+          "epoch": 7, "ts": 1716256800000 }
+```
+
+All brokers' `BookieClusterConfigWatcher` fire and execute, in order:
+
+1. `MultiClusterManagedLedgerStorage.switchActiveCluster(newCluster)` —
+   hot-swap `activeClusterName`; new BK client lazily created if not present.
+2. First switch: enable dual-read in the read router; subsequent switches:
+   refresh epoch.
+3. Emit metrics.
+4. **Origin broker only:** start the orchestrator (`BUILD → PROMOTE → 
CLEANUP`).
+   Non-origin brokers do not start an orchestrator. If the directive body
+   lacks `origin`, brokers fall back to leader election among themselves
+   (using the existing `LeaderElectionService`) to pick exactly one
+   orchestrator.
+
+### Phase 4: BUILD (Schema + Cursor Ledger Bytes)
+
+The orchestrator runs **two parallel stages** with bounded concurrency.
+
+#### Schema Ledger BUILD (centralized)
+
+```
+for each schemaId in /schemas/* (parallel, bounded):
+  locator = brokerZk.get("/schemas/<schemaId>")
+  for each indexEntry in locator.indexList:
+    ledgerId = indexEntry.position.ledgerId
+    cluster  = readClusterAttr(indexEntry.position) ?? 
activeClusterAtFirstWrite
+    if cluster == oldCluster:
+      LedgerCopyUtil.copyLedgerPreservingIds(
+          ledgerId, oldBk, newBk,
+          customMetadata,           // identical to source ledger
+          ensembleSize, qw, qa)
+        // Internally:
+        //   newBk.asyncCreateLedgerAdv(ledgerId, …)        ← SAME ledgerId
+        //   for entryId in [0, lastAddConfirmed]:
+        //     newLh.addEntry(entryId, srcEntry)            ← SAME entryId
+        //   newLh.close()
+```
+
+#### Cursor Ledger BUILD (centralized)
+
+Identical pattern over `/managed-ledgers/<topic>/<cursorName>` znodes, copying

Review Comment:
   Thanks, the explanation makes sense to me. I agree that preserving the same 
ledgerId can make the promotion/rollback state machine much simpler, especially 
if the goal is to keep existing positions unchanged and only switch the cluster 
attribution.
   
   I have two related questions that may be worth clarifying in the PIP.
   
   First, if the design relies on copying ledgers with the same ledgerId into 
the target BK cluster, do we need to explicitly define the safety invariant 
around ledgerId collision? Since different BK clusters normally have 
independent ledgerId allocation spaces, the source ledgerId may already exist 
in the target cluster, or may need to be reserved/imported in a way that 
prevents future allocation conflicts. It would be helpful to document whether 
the target BK cluster is expected to be empty/dedicated, or whether the 
migration flow will explicitly detect/reserve/import these ledgerIds safely.
   
   Second, I wonder whether preserving the same ledgerId is strictly required 
for all metadata ledgers, especially schema/cursor ledgers. For normal data 
ledgers, I understand the motivation to keep "(ledgerId, entryId)" stable. But 
for schema/cursor ledgers, the ledgerId seems more like an internal storage 
reference. If same-ledgerId import is hard to guarantee, could schema/cursor 
use a newly allocated target ledgerId and record the mapping in metadata 
instead?
   
   For example, schema could potentially be modeled as "schema version -> 
source position / target position", with historical versions backfilled and new 
versions dual-written during the migration window. Cursor ledgers may also be 
able to use an explicit migrated target ledger id, as long as 
recovery/delete/cleanup paths consistently resolve it. This would be more 
complex than same-ledgerId copy, so I’m not suggesting it must replace the 
current design. I just wonder whether it is worth documenting as an 
alternative/trade-off, since it may make the design more applicable to 
non-empty target BK clusters.
   
   I may be missing some constraints here, so I’d be happy to hear your 
thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to