dao-jun commented on code in PR #25774:
URL: https://github.com/apache/pulsar/pull/25774#discussion_r3241892479


##########
pip/pip-470.md:
##########
@@ -0,0 +1,587 @@
+# PIP-470: Dynamic Bookie Cluster Switching for Brokers
+
+## Motivation
+
+Apache Pulsar's broker layer is bound to a fixed BookKeeper (Bookie) cluster 
at startup time through the `bookkeeperMetadataServiceUri` configuration. This 
static binding becomes a bottleneck in several real-world operational scenarios:
+
+1. **Slow Failure Recovery**: When a Bookie cluster experiences capacity 
exhaustion, sustained latency degradation, or non-recoverable failures, the 
only mitigation today is to wait for the Bookie cluster to recover. In 
elastic-storage deployments where a single Bookie cluster backs 100+ broker 
clusters, the blast radius is enormous.
+
+2. **No Capacity Escape Hatch**: When a Bookie cluster approaches its capacity 
ceiling, brokers cannot "spill over" to a fresh Bookie cluster. Operators must 
scale the existing cluster in place, which is slow and risky.
+
+3. **No Runtime Re-targeting**: There is no supported runtime mechanism to 
migrate brokers from one Bookie cluster to another without restart and full 
data migration.
+
+4. **Data Plane Coupling**: Bookie cluster switching deals with the data 
plane: terabytes of ledger bytes, ongoing publishes, ongoing acknowledgments, 
and live cursors. A copy-everything-then-cutover approach is infeasible.
+
+This PIP proposes a **safe, ledger-attribution-driven framework** that allows 
operators to switch a broker cluster's underlying Bookie cluster at runtime, 
with **zero data-plane downtime** and **per-ledger routing** that preserves the 
readability of historical data on the old cluster.
+
+The framework enables:
+
+- **Zero-downtime switching** — publish and consume continue without 
interruption
+- **Per-ledger routing** — each ledger carries its own cluster attribution; 
reads are routed to the cluster that physically holds the bytes
+- **No bulk data migration** — Topic Data Ledgers are NOT migrated; they age 
out naturally via TTL/Retention
+- **Targeted migration of small, long-lived metadata ledgers** — only Schema 
Ledgers and Cursor Ledgers are migrated, while preserving their original 
`ledgerId`
+- **Automatic rollback** within a configurable window
+- **Operator-driven, not auto-failover** — avoids split-brain
+
+## Goal
+
+Provide a runtime mechanism by which a broker cluster can be re-pointed from 
one Bookie cluster (`oldCluster`) to a new Bookie cluster (`newCluster`) 
without service interruption, while preserving the ability to read historical 
data that physically resides on `oldCluster`.
+
+### In Scope
+
+- Per-broker-cluster configuration of multiple registered Bookie clusters via 
Broker-ZK metadata
+- A `DynamicBookKeeperClientFactory` that holds multiple BK clients 
simultaneously
+- Per-ledger cluster attribution stored in proto fields 
(`LedgerInfo.bookieClusterName`, `ManagedCursorInfo.bookieClusterName`, 
`PositionInfo.bookieClusterName`)
+- Read-path routing keyed by attribution field (`BookieClusterReadRouter`)
+- Asynchronous post-switch migration of Schema and Cursor Ledgers, 
**preserving the original `ledgerId`** via 
`BookKeeper.asyncCreateLedgerAdv(ledgerId, ...)`
+- CLI (`pulsar-admin bookie-clusters`) + REST API + Java Admin SDK 
(three-layer consistency)
+- Automatic rollback within a configurable window
+- Support for both **co-located deployments** (Broker-ZK and Bookie-ZK share 
the same physical ZK ensemble with different chroots) and **split deployments**
+
+### Out of Scope
+
+- **Topic Data Ledger migration**: Topic data is NOT migrated; old Topic Data 
Ledgers age out via the topic's TTL/Retention policy. Old data is read from the 
old cluster via attribution-based routing for as long as the old cluster is 
reachable.
+- **Automatic failure detection and switch triggering**: Switching is 
operator-initiated to avoid split-brain.
+- **Cross-cluster (geo-replicated) coordination**: Each broker cluster 
switches its own Bookie cluster independently.
+- **BookKeeper-cluster-side internal migration tools**: This PIP only 
orchestrates from the broker side; recovering data within a Bookie cluster 
(e.g., underreplication) is delegated to BookKeeper itself.
+- **Changing `metadataStoreUrl` (Broker-ZK)**: Only the Bookie cluster 
(`bookkeeperMetadataServiceUri`) is switched; Broker-ZK remains the same 
throughout.
+
+## High Level Design
+
+The framework introduces three new long-lived broker components:
+
+1. **`DynamicBookKeeperClientFactory`** — Maintains a `Map<clusterName, 
BookKeeper>` and exposes the currently active client. Replaces the singleton 
`BookKeeper` previously held by `ManagedLedgerClientFactory`.
+2. **`BookieClusterReadRouter`** — On every read, resolves the target cluster 
from the per-ledger attribution field and returns the matching BK client.
+3. **`BookieClusterSwitchOrchestrator`** — Drives the post-switch state 
machine (`BUILD → PROMOTE → CLEANUP`) for Schema and Cursor Ledgers.
+
+### Key Principles
+
+| Principle | Description |
+|-----------|-------------|
+| **BookieClusterName as Single Source of Truth** | At ledger creation time, 
the active cluster name is "stamped" into the ledger's metadata. Reads are 
routed by that stamp. No history table, no boundary heuristics. |
+| **Attribution Field, Not Side ZNode** | The attribution lives directly 
inside `MLDataFormats.proto`'s `LedgerInfo` / `ManagedCursorInfo` and 
`SchemaStorageFormat.proto`'s `PositionInfo`. No 
"/admin/bookie-clusters/schema-ledger-clusters" side path is introduced. This 
eliminates dual-source-of-truth and cache-coherence risks. |
+| **Topic Data: Don't Migrate** | Topic Data Ledgers have TTL/Retention; let 
them age out. Switching does not move terabytes of data. |
+| **Schema / Cursor: Migrate, but Preserve `ledgerId`** | Schema/Cursor are 
tied to topic/subscription lifecycles and cannot be left behind. We use 
`asyncCreateLedgerAdv(ledgerId, ...)` to recreate the **same `ledgerId`** in 
the new cluster, copy entries with **identical `entryId`**, then CAS only the 
`bookieClusterName` field. **`cursorsLedgerId`, `position.ledgerId`, 
`position.entryId` are byte-for-byte unchanged** in Broker-ZK. |
+| **Operator-Driven, Single Active Cluster** | Switching is triggered by 
`pulsar-admin bookie-clusters switch`; auto-failover is rejected. At any moment 
exactly one cluster is `status=ACTIVE` in Broker-ZK. |
+| **Broker-ZK is the Switch Source-of-Truth** | All cluster registrations and 
the switch directive live in Broker-ZK. No Global-ZK, no external coordinator. |
+| **Minimum proto Change** | Only `optional` fields are added 
(`bookieClusterName`, `properties`). proto2 forward/backward compatibility 
holds. |
+
+## Detailed Design
+
+### Switch Phases
+
+```
+NOT_REGISTERED
+    ↓
+[Operator: pulsar-admin bookie-clusters register --name newCluster --uri ... 
--status STANDBY]
+    ↓
+REGISTERED (STANDBY)
+    ↓
+[Operator: pulsar-admin bookie-clusters switch --target newCluster]
+    ↓ (POST /switch internally invokes /precheck for idgen-long, see I11)
+SWITCH_TRIGGERED ← Coordinator broker writes 
/admin/bookie-clusters/switch-target
+    ↓
+LIVE_DUAL_READ ← All brokers' watchers fire; new writes go to newCluster;
+    ↓             reads still go to oldCluster for ledgers attributed to it
+BUILD_PROMOTE_CLEANUP
+    ├── BUILD     ← Coordinator centrally copies Schema/Cursor ledger bytes to 
newCluster
+    │             with the SAME ledgerId; entries copied with the SAME entryId
+    ├── PROMOTE   ← Schema: coordinator CAS-rewrites SchemaLocator (changes 
only
+    │             position.bookieClusterName)
+    │             Cursor: coordinator forwards POST /internal/promote-cursor
+    │             to topic owner broker; owner does CAS in ManagedCursorImpl 
lock
+    └── CLEANUP   ← After rollback window, delete old-cluster copies; promote
+                  Broker-ZK status: oldCluster→DEPRECATED, newCluster→ACTIVE
+    ↓
+DONE / DONE_WITH_FAILURES
+
+(Operator-triggered, within rollback window):
+ROLLBACK ← CAS attribution fields back to oldCluster; new-cluster copies are
+          deleted asynchronously
+```
+
+### Phase 1: Cluster Registration
+
+```bash
+pulsar-admin bookie-clusters register \
+    --name new-cluster \
+    --metadata-service-uri zk+null://new-zk:2181/ledgers-v2 \
+    --status STANDBY
+```
+
+The `BookieClusterConfigManager` validates that:
+- The `metadata-service-uri` does not collide with any already-registered 
cluster (prevents accidental aliasing in the BK client map).
+- For co-located deployments, the chroot must differ from existing clusters.
+
+### Phase 2: Pre-Switch Validation (`precheck`)
+
+```bash
+pulsar-admin bookie-clusters precheck --name new-cluster
+```
+
+`IdgenPrecheckService` verifies invariant **I11**: the new cluster's 
`/ledgers/idgen-long` must be advanced beyond `max(ledgerId)` of the source 
cluster. Otherwise, `asyncCreateLedgerAdv(ledgerId, ...)` during BUILD will 
collide with newly-allocated ledgerIds. The precheck returns a structured 
result:
+
+```json
+{
+  "ready": true,
+  "idgenAdvancedBeyondSource": true,
+  "sourceClusterName": "old-cluster",
+  "targetClusterName": "new-cluster",
+  "sourceMaxHighOrderBit": 12345,
+  "targetMaxHighOrderBit": 1012345
+}
+```
+
+`POST /switch` invokes `precheck` inline; failure returns HTTP 409 (bypassable 
via `bookieClusterSwitchSkipIdgenPrecheck=true` in lab/staging only).
+
+### Phase 3: Switch Trigger
+
+```bash
+pulsar-admin bookie-clusters switch --target new-cluster
+```
+
+The receiving broker (origin) writes the `switch-target` znode in Broker-ZK 
with CAS:
+
+```
+/admin/bookie-clusters/switch-target
+  body: "new-cluster\norigin=https://broker-1:8080";
+```
+
+All brokers' `BookieClusterConfigWatcher` fire, executing the callback chain:
+
+1. `DynamicBookKeeperClientFactory.switchToCluster(newCluster)` — hot-swap 
`activeClusterName`; new BK client lazily created if not yet present
+2. First switch: `BookieClusterReadRouter.enableDualRead()` (records 
`initialClusterName`, `switchLedgerIdBoundary`, `switchTimestampMillis`)
+   Subsequent switches: `BookieClusterReadRouter.recordSwitchEpoch()` (only 
refreshes `switchTimestampMillis`)
+3. `BookieClusterSwitchMetrics.recordSwitchSuccess()`
+4. **Origin broker only**: 
`BookieClusterSwitchOrchestrator.runStateMachine(BUILD → PROMOTE → CLEANUP)`
+
+Non-origin brokers do not start the orchestrator. If the `switch-target` znode 
body lacks the `origin=` line (e.g., legacy or operator-edited), brokers fall 
back to leader election to pick exactly one orchestrator.
+
+### Phase 4: BUILD (Schema + Cursor Ledger Bytes Copy)
+
+The orchestrator runs **two stages in parallel** with bounded concurrency 
(default 16):
+
+#### Schema Ledger BUILD (centralized)
+
+```
+for each schemaId in /schemas/* (parallel, bounded):
+  locator = brokerZk.get("/schemas/<schemaId>")
+  for each indexEntry in locator.indexList:
+    ledgerId = indexEntry.position.ledgerId
+    cluster  = indexEntry.position.bookieClusterName ?? activeCluster
+    if cluster == oldCluster:
+      LedgerCopyUtil.copyLedgerPreservingIds(
+          ledgerId, oldBk, newBk,
+          customMetadata,  // identical to source ledger
+          ensembleSize, qw, qa)
+        // Internally:
+        //   newBk.asyncCreateLedgerAdv(ledgerId, ...)        ← SAME ledgerId
+        //   for entryId in [0, lastAddConfirmed]:
+        //     newLh.addEntry(entryId, srcEntry)              ← SAME entryId
+        //   newLh.close()
+```
+
+#### Cursor Ledger BUILD (centralized)
+
+Identical pattern, scanning `/managed-ledgers/<topic>/<cursorName>` znodes, 
copying each `cursorsLedgerId`.
+
+**Why BUILD is safe to centralize**:
+1. Reads of the old ledger are read-only — no contention with owner brokers' 
write paths.
+2. Writes to the new cluster are to a **dormant copy** — no online reader sees 
it yet (attribution is still `oldCluster`).
+3. The orchestrator uses dedicated BK clients with isolated thread pools.
+
+#### Idempotency on `LedgerExistException`
+
+If `asyncCreateLedgerAdv(ledgerId, ...)` returns `LedgerExistException` (BUILD 
retry / partial prior run), `LedgerCopyUtil.verifyDstMatchesSrc` opens both 
ledgers and compares `getLastAddConfirmed()` and `getLength()`. If both match, 
the existing ledger is treated as a successful idempotent copy 
(`migration_ledger_id_conflict_total` is incremented for visibility). Mismatch 
→ record as `failedLedgers`; attribution remains on the old cluster.
+
+### Phase 5: PROMOTE (CAS Attribution Field)
+
+#### Schema PROMOTE (centralized)
+
+```
+for each schemaId built successfully:
+  retry up to MAX_PROMOTE_RETRIES (5):
+    (locator, version) = brokerZk.getWithVersion("/schemas/<schemaId>")
+    updated = locator.toBuilder()
+        .clearIndex()
+        .addAllIndex(locator.indexList.map { ie ->
+            if (ie.position.bookieClusterName == oldCluster):
+              ie.toBuilder().setPosition(
+                  ie.position.toBuilder()
+                      .setBookieClusterName(newCluster)  // ★ ONLY field 
changed
+                      .build()
+              ).build()
+            else: ie  // already newCluster (concurrent putSchema) — leave 
alone
+        })
+        .build()
+    try brokerZk.put("/schemas/<schemaId>", updated.toByteArray(), version)
+    catch BadVersionException: continue  // re-read and retry
+```
+
+**Crucially**, `position.ledgerId`, `position.entryId`, `info.version`, and 
`info.hash` are **byte-for-byte unchanged** (invariant **I10**). Only 
`bookieClusterName` is rewritten.
+
+#### Cursor PROMOTE (forwarded to topic owner)
+
+Cursor is more delicate because `ManagedCursorImpl` is a **stateful object 
owned by exactly one broker** (the topic owner) and holds an in-process lock 
that serializes markDelete flushes. Letting the orchestrator broker write 
`ManagedCursorInfo` directly would race with the owner's pending markDelete CAS.
+
+```
+[orchestrator broker]
+  for each (topic, cursorName, cursorsLedgerId) built successfully:
+    ownerUrl = namespaceService.findOwnerHttpUrl(topic)
+    POST {ownerUrl}/admin/v2/bookie-clusters/internal/promote-cursor
+        body: {topic, cursorName, targetClusterName=newCluster,
+               expectedCursorsLedgerId=cursorsLedgerId}
+
+[owner broker]
+  cursor = brokerService.getTopic(topic).getCursor(cursorName)
+  synchronized (cursor):
+    if cursor.cursorsLedgerId != expectedCursorsLedgerId:
+      return 409  // cursor rolled over since BUILD; coordinator records as
+                  // promotedViaRolloverFallback (the new cursorsLedgerId
+                  // was already stamped into newCluster at rollover time
+                  // by the writer-side attribution stamping)
+    metaStore.asyncUpdateCursorInfo(
+        cursor.path,
+        ManagedCursorInfo.newBuilder(cursor.managedCursorInfo)
+            .setBookieClusterName(newCluster)  // ★ ONLY field changed
+            .build(),
+        expectedZkVersion)
+    // BadVersion → re-read; bookieClusterName is orthogonal to
+    //   markDeleteLedgerId/markDeleteEntryId, so retry converges.
+```
+
+**Invariant I9**: PROMOTE must change only `bookieClusterName`; 
`cursorsLedgerId`, `markDeleteLedgerId`, `markDeleteEntryId`, individual 
deleted ranges, batched deletion indices, and cursor properties are preserved.
+
+#### Concurrent putSchema and PROMOTE
+
+For Schema, concurrent `putSchema` writes are naturally serialized via the 
`/schemas/<schemaId>` znode CAS:
+- If `putSchema` lands first, PROMOTE re-reads and sees the newly appended 
`IndexEntry` (already stamped `newCluster` because writes go to the active BK 
client) — leaves it alone, rewrites the rest.
+- If PROMOTE lands first, `putSchema` re-reads and appends a new `IndexEntry` 
with `newCluster` stamp.
+
+Either way, the converged state is "all entries stamped `newCluster`".
+
+### Phase 6: CLEANUP
+
+After the rollback window (`bookieClusterDualReadTimeoutDays`, default 15) 
elapses, or upon explicit `pulsar-admin bookie-clusters cleanup`:
+
+1. Delete old-cluster ledgers via `oldBk.asyncDeleteLedger(ledgerId)`.
+2. CAS-flip Broker-ZK status: `oldCluster.status = DEPRECATED`, 
`newCluster.status = ACTIVE` (invariant **I12**, two sequential CAS — 
`MetadataStore` API does not expose multi-op).
+3. Optionally close the old BK client via `pulsar-admin bookie-clusters 
close-client --name oldCluster`.
+
+`POST /admin/v2/bookie-clusters/cleanup` accepts:
+
+```json
+{
+  "dryRun": true,           // default; preview only
+  "srcClusterFilter": "...", // optional; restrict to one source cluster
+  "maxDeletions": 0,         // 0 = unlimited
+  "force": false             // bypass dual-read-window guard (lab only)
+}
+```
+
+### Phase 7: ROLLBACK (within window)
+
+If issues are detected, the operator triggers rollback. The reverse-CAS flips 
`bookieClusterName` back to `oldCluster` for affected Schema/Cursor; 
new-cluster ledgers are scheduled for delayed deletion. Because **`ledgerId` 
and `entryId` were never touched** in Broker-ZK, rollback is a constant-time 
metadata flip — no data movement.
+
+### Failure Handling
+
+| Stage | Failure | Behavior |
+|-------|---------|----------|
+| precheck | idgen-long not advanced | HTTP 409; switch refused |
+| Switch trigger | New BK client init fails | `activeClusterName` stays on 
old; switch rejected |
+| BUILD | Source ledger unreadable | Record in `failedLedgers`; attribution 
stays on old cluster (still readable) |
+| BUILD | `LedgerExistException` + LAC mismatch | Record in `failedLedgers`; 
do not PROMOTE; operator can retry switch |
+| PROMOTE | CAS BadVersion (5 retries exhausted) | Log warn; not counted in 
`failedLedgers`; next `switch --target <same>` retries (orchestrator is 
idempotent) |
+| PROMOTE | Cursor owner returns 409 (rollover) | Recorded as 
`promotedViaRolloverFallback`; old `cursorsLedgerId` ages out naturally |
+| CLEANUP | `promoteActiveCluster` CAS fails | Orchestrator finishes in `DONE` 
/ `DONE_WITH_FAILURES`; in-memory `activeClusterName` is correct, but Broker-ZK 
ACTIVE may diverge → operator runs `register --status ACTIVE` to reconcile |
+
+## Implementation Details
+
+### Data Structures
+
+#### Cluster Registry (Broker-ZK)
+
+```
+/admin/bookie-clusters/                ← bookieClusterConfigPath (configurable)
+    ├── old-cluster                    ← znode body = JSON
+    │     {
+    │       "name": "old-cluster",
+    │       "metadataServiceUri": "zk://old-zk:2181/ledgers",
+    │       "status": "ACTIVE"          // exactly ONE cluster ACTIVE at a time
+    │     }
+    └── new-cluster
+          {
+            "name": "new-cluster",
+            "metadataServiceUri": "zk+null://new-zk:2181/ledgers-v2",
+            "status": "STANDBY"
+          }
+
+/admin/bookie-clusters/switch-target   ← written on switch
+    body: "new-cluster\norigin=https://broker-1:8080";
+```
+
+#### proto Changes
+
+**`managed-ledger/src/main/proto/MLDataFormats.proto`**:
+
+```protobuf
+message ManagedLedgerInfo {
+  message LedgerInfo {
+    required int64 ledgerId = 1;
+    optional int64 entries = 2;
+    optional int64 size = 3;
+    optional int64 timestamp = 4;
+    optional OffloadContext offloadContext = 5;
+    optional string bookieClusterName = 6;   // ★ new (Topic Data Ledger 
attribution)
+    repeated KeyValue properties = 7;        // ★ new (extension slot)

Review Comment:
   we already have the field `properties` in PIP-404. And maybe we can also use 
the field to store `bookieClusterName`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to