PIP PR: https://github.com/apache/pulsar/pull/25196
PR with implementation: https://github.com/apache/pulsar/pull/25219 ---- # PIP-454: Metadata Store Migration Framework ## Motivation Apache Pulsar currently uses Apache ZooKeeper as its metadata store for broker coordination, topic metadata, namespace policies, and BookKeeper ledger management. While ZooKeeper has served well, there are several motivations for enabling migration to alternative metadata stores: 1. **Operational Simplicity**: Alternative metadata stores like Oxia may offer simpler operations, better observability, or reduced operational overhead compared to ZooKeeper ensembles. 2. **Performance Characteristics**: Different metadata stores have different performance profiles. Some workloads may benefit from stores optimized for high throughput or low latency. 3. **Deployment Flexibility**: Organizations may prefer metadata stores that align better with their existing infrastructure and expertise. 4. **Zero-Downtime Migration**: Operators need a safe, automated way to migrate metadata between stores without service interruption. Currently, there is no supported path to migrate from one metadata store to another without cluster downtime. This PIP proposes a **safe, simple migration framework** that ensures metadata consistency by avoiding complex dual-write/dual-read patterns. The framework enables: - **Zero-downtime migration** from any metadata store to any other supported store - **Automatic ephemeral node recreation** in the target store - **Version preservation** to ensure conditional writes continue working - **Automatic failure recovery** if issues are detected - **Minimal configuration changes** - no config updates needed until after migration completes ## Goal Provide a safe, automated framework for migrating Apache Pulsar's metadata from one store implementation (e.g., ZooKeeper) to another (e.g., Oxia) with zero service interruption. ### In Scope - Migration framework supporting any source → any target metadata store - Automatic ephemeral node recreation by brokers and bookies - Persistent data copy with version preservation - CLI commands for migration control and monitoring - Automatic failure recovery during migration - Support for broker and bookie participation - Read-only mode during migration for consistency ### Out of Scope - Developing new metadata store implementations (Oxia, Etcd support already exists) - Cross-cluster metadata synchronization (different use case) - Automated rollback after COMPLETED phase (requires manual intervention) - Migration of configuration metadata store and geo-replicated clusters (can be done separately) ## High Level Design The migration framework introduces a **DualMetadataStore** wrapper that transparently handles migration without modifying existing metadata store implementations. ### Key Principles 1. **Transparent Wrapping**: The `DualMetadataStore` wraps the existing source store (e.g., `ZKMetadataStore`) without modifying its implementation. 2. **Lazy Target Initialization**: The target store is only initialized when migration begins, triggered by a flag in the source store. 3. **Ephemeral-First Approach**: Before copying persistent data, all brokers and bookies recreate their ephemeral nodes in the target store. This ensures the cluster is "live" in both stores during migration. 4. **Read-Only Mode During Migration**: To ensure consistency, all metadata writes are blocked during PREPARATION and COPYING phases. Components receive `SessionLost` events to defer non-critical operations (e.g., ledger rollovers). 5. **Phase-Based Migration**: Migration proceeds through well-defined phases (PREPARATION → COPYING → COMPLETED). 6. **Generic Framework**: The framework is agnostic to specific store implementations - it works with any source and target that implement the `MetadataStore` interface. 7. **Guaranteed Consistency**: By blocking writes during migration and using atomic copy, metadata is **always in a consistent state**. No dual-write complexity, no data divergence, no consistency issues. ## Detailed Design ### Migration Phases ``` NOT_STARTED ↓ PREPARATION ← All brokers/bookies recreate ephemeral nodes in target ← Metadata writes are BLOCKED (read-only mode) ↓ COPYING ← Coordinator copies persistent data source → target ← Metadata writes still BLOCKED ↓ COMPLETED ← Migration complete, all services using target store ← Metadata writes ENABLED on target ↓ After validation period: * Update config and restart brokers & bookies * Decommission source store (If errors occur): FAILED ← Rollback to source store, writes ENABLED ``` ### Phase 1: NOT_STARTED → PREPARATION **Participant Registration (at startup):** Each broker and bookie registers itself as a migration participant by creating a sequential ephemeral node: - Path: `/pulsar/migration-coordinator/participants/id-NNNN` (sequential) - This allows the coordinator to know how many participants exist before migration starts **Administrator triggers migration:** ```bash pulsar-admin metadata-migration start --target oxia://oxia1:6648 ``` **Coordinator actions:** 1. Creates migration flag in source store: `/pulsar/migration-coordinator/migration` ```json { "phase": "PREPARATION", "targetUrl": "oxia://oxia1:6648" } ``` **Broker/Bookie actions (automatic, triggered by watching the flag):** 1. Detect migration flag via watch on `/pulsar/migration-coordinator/migration` 2. Defer non-critical metadata writes (e.g., ledger rollovers, bundle ownership changes) 3. Initialize connection to target store 4. Recreate ALL ephemeral nodes in target store 5. **Delete** participant registration node to signal "ready" **Coordinator waits for all participant nodes to be deleted (indicating all participants are ready)** ### Phase 2: PREPARATION → COPYING **Coordinator actions:** 1. Updates phase to `COPYING` 2. Performs recursive copy of persistent data from source → target: - Skips ephemeral nodes (already recreated) - Concurrent operations limited by semaphore (default: 1000 pending ops) - Breadth-first traversal to process all paths - Progress logged periodically **During this phase:** - Brokers/bookies continue normal READ operations - Metadata WRITES are BLOCKED (return failure) - Ephemeral nodes remain alive in both stores - All reads still go to source store **During this phase:** - Metadata writes are BLOCKED (return error to clients) - Metadata reads continue normally from source store - **Data plane operations unaffected**: Publish/consume, ledger writes continue normally - Version-id and modification count preserved using direct Oxia client - Breadth-first traversal with max 1000 concurrent operations **Estimated duration:** - **< 30 seconds** for typical deployments with up to **500 MB of metadata** in ZooKeeper **Impact on operations:** - ✅ Existing topics: Publish and consume continue without interruption - ✅ BookKeeper: Ledger writes and reads continue normally - ✅ Clients: Connected producers and consumers unaffected - ❌ Admin operations: Topic/namespace creation blocked temporarily - ❌ Bundle operations: Load balancing deferred until completion ### Phase 3: COPYING → COMPLETED **Coordinator actions:** 1. Updates phase to `COMPLETED` 2. Logs success message with total copied node count **Broker/Bookie actions (automatic, triggered by phase update):** 1. Detect `COMPLETED` phase 2. Deferred operations can now proceed 3. Switch routing: - **Writes**: Go to target store only - **Reads**: Go to target store only **At this point:** - Cluster is running on target store - Source store remains available for safety - Metadata writes are enabled again **Operator follow-up (after validation period):** 1. Update configuration files: ```properties # Before (ZooKeeper): metadataStoreUrl=zk://zk1:2181,zk2:2181/pulsar # After (Oxia): metadataStoreUrl=oxia://oxia1:6648 ``` 2. Perform rolling restart with new config 3. After all services restarted, decommission source store ### Failure Handling: ANY_PHASE → FAILED **If migration fails at any point:** 1. Coordinator updates phase to `FAILED` 2. Broker/Bookie actions: - Detect `FAILED` phase - Discard target store connection - Continue using source store - Metadata writes enabled again **Operator actions:** 1. Review logs to understand failure cause 2. Fix underlying issue 3. Retry migration with `pulsar-admin metadata-migration start --target <url>` ## Implementation Details ### Key Implementation Details: 1. **Direct Oxia Client Usage**: The coordinator uses `AsyncOxiaClient` directly instead of going through `MetadataStore` interface. This allows setting version-id and modification count to match the source values, ensuring conditional writes (compare-and-set operations) continue to work correctly after migration. 2. **Breadth-First Traversal**: Processes paths level by level using a work queue, enabling high concurrency while preventing deep recursion. 3. **Concurrent Operations**: Uses a semaphore to limit pending operations (default: 1000), balancing throughput with memory usage. ### Data Structures **Migration State** (`/pulsar/migration-coordinator/migration`): ```json { "phase": "PREPARATION", "targetUrl": "oxia://oxia1:6648/default" } ``` Fields: - `phase`: Current migration phase (NOT_STARTED, PREPARATION, COPYING, COMPLETED, FAILED) - `targetUrl`: Target metadata store URL (e.g., `oxia://oxia1:6648/default`) **Participant Registration** (`/pulsar/migration-coordinator/participants/id-NNNN`): - Sequential ephemeral node created by each broker/bookie at startup - Empty data (presence indicates participation) - Deleted by participant when preparation complete (signals "ready") - Coordinator waits for all to be deleted before proceeding to COPYING phase **No additional state tracking**: The simplified design removes complex state tracking and checksums. Migration state is kept minimal. ### CLI Commands ```bash # Start migration pulsar-admin metadata-migration start --target <target-url> # Check status pulsar-admin metadata-migration status ``` The simplified design only requires two commands. Rollback happens automatically if migration fails (phase transitions to FAILED). ### REST API ``` POST /admin/v2/metadata/migration/start Body: { "targetUrl": "oxia://..." } GET /admin/v2/metadata/migration/status Returns: { "phase": "COPYING", "targetUrl": "oxia://..." } ``` ## Safety Guarantees ### Why This Approach is Safe **The migration design guarantees metadata consistency by avoiding dual-write and dual-read patterns entirely:** 1. **Single Source of Truth**: At any given time, there is exactly ONE active metadata store: - Before migration: Source store (ZooKeeper) - During PREPARATION and COPYING: Source store (read-only) - After COMPLETED: Target store (Oxia) 2. **No Dual-Write Complexity**: Unlike approaches that write to both stores simultaneously, this design eliminates: - Write synchronization issues - Conflict resolution between stores - Data divergence problems - Partial failure handling complexity 3. **No Dual-Read Complexity**: Unlike approaches that read from both stores, this design eliminates: - Read consistency issues - Cache invalidation across stores - Stale data problems - Complex fallback logic 4. **Atomic Cutover**: All participants switch stores simultaneously when COMPLETED phase is detected. There is no ambiguous state where some participants use one store and others use another. 5. **Fast Migration Window**: With **< 30 seconds** for typical metadata sizes (even up to 500 MB), the read-only window is minimal and acceptable for most production environments. **Bottom line**: Metadata is **always in a consistent state** - either fully in the source store or fully in the target store, never split or diverged between them. ### Data Integrity 1. **Version Preservation**: All persistent data is copied with original version-id and modification count preserved. This ensures conditional writes (compare-and-set operations) continue working after migration. 2. **Ephemeral Node Recreation**: All ephemeral nodes are recreated by their owning brokers/bookies before persistent data copy begins. 3. **Read-Only Mode**: All metadata writes are blocked during PREPARATION and COPYING phases, ensuring no data inconsistencies during migration. **Important**: Read-only mode only affects metadata operations. Data plane operations continue normally: - ✅ **Publishing and consuming messages** works without interruption - ✅ **Reading from existing topics and subscriptions** works normally - ✅ **Ledger writes to BookKeeper** continue unaffected - ❌ **Creating new topics or subscriptions** will be blocked temporarily - ❌ **Namespace/policy updates** will be blocked temporarily - ❌ **Bundle ownership changes** will be deferred until migration completes ### Operational Safety 1. **No Downtime**: Brokers and bookies remain online throughout the migration. **Data plane operations (publish/consume) continue without interruption.** Only metadata operations are temporarily blocked during the migration phases. 2. **Graceful Failure**: If migration fails at any point, phase transitions to FAILED and cluster returns to source store automatically. 3. **Session Events**: Components receive `SessionLost` event during migration to defer non-critical writes (e.g., ledger rollovers), and `SessionReestablished` when migration completes or fails. 4. **Participant Coordination**: Migration waits for all participants to complete preparation before copying data. ### Consistency 1. **Atomic Cutover**: All participants switch to target store simultaneously when COMPLETED phase is detected. 2. **Ephemeral Session Consistency**: Each participant manages its own ephemeral nodes in target store with proper session management. 3. **No Dual-Write Complexity**: By blocking writes during migration, we avoid complex dual-write error handling and data divergence issues. ## Configuration ### No Configuration Changes for Migration The beauty of this design is that **no configuration changes are needed to start migration**: - Brokers and bookies continue using their existing `metadataStoreUrl` config - The `DualMetadataStore` wrapper is automatically applied when using ZooKeeper - Target URL is provided only when triggering migration via CLI ### Post-Migration Configuration After migration completes and validation period ends, update config files: ```properties # Before migration metadataStoreUrl=zk://zk1:2181,zk2:2181,zk3:2181/pulsar # After migration (update and rolling restart) metadataStoreUrl=oxia://oxia1:6648 ``` ## Comparison with Kafka's ZooKeeper → KRaft Migration Apache Kafka faced a similar challenge migrating from ZooKeeper to KRaft (Kafka Raft). Their approach provides useful comparison points: ### Kafka's Approach (KIP-866) **Migration Strategy:** - **Dual-mode operation**: Kafka brokers run in a hybrid mode where the KRaft controller reads from ZooKeeper - **Metadata synchronization**: KRaft controller actively mirrors metadata from ZooKeeper to KRaft - **Phased cutover**: Operators manually transition from ZK_MIGRATION mode to KRAFT mode - **Write forwarding**: During migration, metadata writes go to ZooKeeper and are replicated to KRaft **Timeline:** - Migration can take hours or days as metadata is continuously synchronized - Requires careful monitoring of lag between ZooKeeper and KRaft - Rollback possible until final KRAFT mode is committed ### Pulsar's Approach (This PIP) **Migration Strategy:** - **Transparent wrapper**: DualMetadataStore wraps existing store without broker code changes - **Read-only migration**: Metadata writes blocked during migration (< 30 seconds for most clusters) - **Atomic copy**: All persistent data copied in one operation with version preservation - **Single source of truth**: No dual-write or dual-read - metadata always consistent - **Automatic cutover**: All participants switch simultaneously when COMPLETED phase detected **Timeline:** - Migration completes in **< 30 seconds** for typical deployments (even up to 500 MB metadata) - No lag monitoring needed - Automatic rollback on failure (FAILED phase) ### Key Differences | Aspect | Kafka (ZK → KRaft) | Pulsar (ZK → Oxia) | |--------|-------------------|-------------------| | **Migration Duration** | Hours to days | **< 30 seconds** (up to 500 MB) | | **Metadata Writes** | Continue during migration | Blocked during migration | | **Data Plane** | Unaffected | Unaffected (publish/consume continues) | | **Approach** | Continuous sync + dual-mode | Atomic copy + read-only mode | | **Consistency** | Dual-write (eventual consistency) | **Single source of truth (always consistent)** | | **Complexity** | High (dual-mode broker logic) | Low (transparent wrapper) | | **Version Preservation** | Not applicable (different metadata models) | Yes (conditional writes preserved) | | **Rollback** | Manual, complex | Automatic on failure | | **Monitoring** | Requires lag tracking | Simple phase monitoring | ### Why Pulsar's Approach Differs 1. **Data Plane Independence**: **The key insight is that Pulsar's data plane (publish/consume, ledger writes) does not require metadata writes to function.** This architectural property allows pausing metadata writes for a brief period (< 30 seconds) without affecting data operations. This is what makes the migration **provably safe and consistent**, not the metadata size. 2. **Write-Pause Safety**: Pausing writes during copy ensures: - No dual-write complexity - No data divergence between stores - No conflict resolution needed - Guaranteed consistency This works regardless of metadata size - whether 50K nodes or millions of topics. The migration handles large metadata volumes through high concurrency (1000 parallel operations), completing in < 30 seconds even for 500 MB. 3. **Ephemeral Node Handling**: Pulsar has significant ephemeral metadata (broker registrations, bundle ownership), making dual-write complex. Read-only mode simplifies this. 4. **Conditional Writes**: Pulsar relies heavily on compare-and-set operations. Version preservation ensures these continue working post-migration, which Kafka doesn't need to address. 5. **Architectural Enabler**: Pulsar's separation of data plane and metadata plane allows brief metadata write pauses without data plane impact, enabling a simpler, safer migration approach. ### Lessons from Kafka's Experience Pulsar's design incorporates lessons from Kafka's migration: - ✅ **Avoid dual-write complexity**: Kafka found dual-mode operation added significant code complexity. Pulsar's read-only approach is simpler **and guarantees consistency**. - ✅ **Clear phase boundaries**: Kafka's migration has unclear "completion" point. Pulsar has explicit COMPLETED phase. - ✅ **Automatic participant coordination**: Kafka requires manual broker restarts. Pulsar participants coordinate automatically. - ✅ **Fast migration**: **< 30 seconds** read-only window is acceptable for most production environments - ❌ **Brief write unavailability**: Pulsar accepts brief metadata write unavailability (< 30 sec) vs Kafka's continuous operation, but gains guaranteed consistency and simplicity. ## References - [PIP-45: Pluggable metadata interface](https://github.com/apache/pulsar/wiki/PIP-45%3A-Pluggable-metadata-interface) - [Oxia: A Scalable Metadata Store](https://github.com/streamnative/oxia) - [MetadataStore Interface](https://github.com/apache/pulsar/blob/master/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java) - [KIP-866: ZooKeeper to KRaft Migration](https://cwiki.apache.org/confluence/display/KAFKA/KIP-866+ZooKeeper+to+KRaft+Migration) - Kafka's approach to metadata store migration -- Matteo Merli <[email protected]>
