This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new ae484ec7d feat(consensus): enable independent commit progress across 
namespaces (#2765)
ae484ec7d is described below

commit ae484ec7d53ab64e377e878294032c2715b818d6
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Feb 20 14:56:48 2026 +0100

    feat(consensus): enable independent commit progress across namespaces 
(#2765)
    
    LocalPipeline's single VecDeque blocked independent commit
    progress across namespaces. NamespacedPipeline uses per-namespace
    VecDeques under one global op sequence and hash chain, so
    drain_committable_all drains each namespace independently.
    
    Additional fixes: wire last_prepare_checksum through Project
    and on_replicate instead of hardcoded 0; walk consecutive ops
    in ack_quorum_reached to prevent premature drain; propagate
    namespace in build_reply_message; enforce namespace registration
    before push; add namespace field to view change headers; replace
    magic-number dispatch with Operation enum in simulator.
---
 core/common/src/types/consensus/header.rs |  26 +-
 core/consensus/src/impls.rs               | 147 ++++----
 core/consensus/src/lib.rs                 |  10 +-
 core/consensus/src/namespaced_pipeline.rs | 540 ++++++++++++++++++++++++++++++
 core/consensus/src/plane_helpers.rs       | 159 ++++++++-
 core/consensus/src/vsr_timeout.rs         |   1 +
 core/metadata/src/impls/metadata.rs       |  89 ++---
 core/partitions/src/iggy_partitions.rs    | 206 +++++++-----
 core/simulator/src/deps.rs                |  18 +-
 core/simulator/src/lib.rs                 |  28 +-
 core/simulator/src/replica.rs             |  57 ++--
 11 files changed, 1035 insertions(+), 246 deletions(-)

diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index c8d311fa7..2991f89c3 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -21,6 +21,8 @@ use thiserror::Error;
 const HEADER_SIZE: usize = 256;
 pub trait ConsensusHeader: Sized + Pod + Zeroable {
     const COMMAND: Command2;
+    // TODO: Trait consts are never evaluated unless explicitly accessed (e.g. 
`<T as ConsensusHeader>::_SIZE_CHECK`).
+    // The size invariant is enforced by repr(C) layout + bytemuck Pod derive; 
consider adding a static_assert in each impl.
     const _SIZE_CHECK: () = assert!(std::mem::size_of::<Self>() == 
HEADER_SIZE);
 
     fn validate(&self) -> Result<(), ConsensusError>;
@@ -135,8 +137,7 @@ pub struct GenericHeader {
     pub replica: u8,
     pub reserved_frame: [u8; 66],
 
-    pub namespace: u64,
-    pub reserved_command: [u8; 120],
+    pub reserved_command: [u8; 128],
 }
 
 unsafe impl Pod for GenericHeader {}
@@ -489,18 +490,17 @@ impl Default for ReplyHeader {
 #[repr(C)]
 pub struct StartViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
-    pub reserved: [u8; 128],
+    pub namespace: u64,
+    pub reserved: [u8; 120],
 }
 
 unsafe impl Pod for StartViewChangeHeader {}
@@ -536,16 +536,14 @@ impl ConsensusHeader for StartViewChangeHeader {
 #[repr(C)]
 pub struct DoViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
     /// The highest op-number in this replica's log.
     /// Used to select the most complete log when log_view values are equal.
@@ -553,11 +551,12 @@ pub struct DoViewChangeHeader {
     /// The replica's commit number (highest committed op).
     /// The new primary sets its commit to max(commit) across all DVCs.
     pub commit: u64,
+    pub namespace: u64,
     /// The view number when this replica's status was last normal.
     /// This is the key field for log selection: the replica with the
     /// highest log_view has the most authoritative log.
     pub log_view: u32,
-    pub reserved: [u8; 108],
+    pub reserved: [u8; 100],
 }
 
 unsafe impl Pod for DoViewChangeHeader {}
@@ -609,16 +608,14 @@ impl ConsensusHeader for DoViewChangeHeader {
 #[repr(C)]
 pub struct StartViewHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
     /// The op-number of the highest entry in the new primary's log.
     /// Backups set their op to this value.
@@ -627,7 +624,8 @@ pub struct StartViewHeader {
     /// This is max(commit) from all DVCs received by the primary.
     /// Backups set their commit to this value.
     pub commit: u64,
-    pub reserved: [u8; 112],
+    pub namespace: u64,
+    pub reserved: [u8; 104],
 }
 
 unsafe impl Pod for StartViewHeader {}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 2b6a8c410..500a522c7 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -317,20 +317,6 @@ impl LocalPipeline {
     pub fn clear(&mut self) {
         self.prepare_queue.clear();
     }
-
-    /// Extract and remove a message by op number.
-    /// Returns None if op is not in the pipeline.
-    pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
-        let head_op = self.prepare_queue.front()?.header.op;
-        if op < head_op {
-            return None;
-        }
-        let index = (op - head_op) as usize;
-        if index >= self.prepare_queue.len() {
-            return None;
-        }
-        self.prepare_queue.remove(index)
-    }
 }
 
 impl Pipeline for LocalPipeline {
@@ -345,10 +331,6 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::pop_message(self)
     }
 
-    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
-        LocalPipeline::extract_by_op(self, op)
-    }
-
     fn clear(&mut self) {
         LocalPipeline::clear(self)
     }
@@ -365,6 +347,10 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::message_by_op_and_checksum(self, op, checksum)
     }
 
+    fn head(&self) -> Option<&Self::Entry> {
+        LocalPipeline::head(self)
+    }
+
     fn is_full(&self) -> bool {
         LocalPipeline::is_full(self)
     }
@@ -389,7 +375,7 @@ pub enum Status {
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum VsrAction {
     /// Send StartViewChange to all replicas.
-    SendStartViewChange { view: u32 },
+    SendStartViewChange { view: u32, namespace: u64 },
     /// Send DoViewChange to primary.
     SendDoViewChange {
         view: u32,
@@ -397,11 +383,22 @@ pub enum VsrAction {
         log_view: u32,
         op: u64,
         commit: u64,
+        namespace: u64,
     },
     /// Send StartView to all backups (as new primary).
-    SendStartView { view: u32, op: u64, commit: u64 },
+    SendStartView {
+        view: u32,
+        op: u64,
+        commit: u64,
+        namespace: u64,
+    },
     /// Send PrepareOK to primary.
-    SendPrepareOk { view: u32, op: u64, target: u8 },
+    SendPrepareOk {
+        view: u32,
+        op: u64,
+        target: u8,
+        namespace: u64,
+    },
 }
 
 #[allow(unused)]
@@ -414,6 +411,7 @@ where
     cluster: u128,
     replica: u8,
     replica_count: u8,
+    namespace: u64,
 
     view: Cell<u32>,
 
@@ -455,16 +453,28 @@ where
 }
 
 impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
-    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, 
pipeline: P) -> Self {
+    pub fn new(
+        cluster: u128,
+        replica: u8,
+        replica_count: u8,
+        namespace: u64,
+        message_bus: B,
+        pipeline: P,
+    ) -> Self {
         assert!(
             replica < replica_count,
             "replica index must be < replica_count"
         );
         assert!(replica_count >= 1, "need at least 1 replica");
+        // TODO: Verify that XOR-based seeding provides sufficient jitter 
diversity
+        // across groups. Consider using a proper hash (e.g., Murmur3) of
+        // (replica_id, namespace) for production.
+        let timeout_seed = replica as u128 ^ namespace as u128;
         Self {
             cluster,
             replica,
             replica_count,
+            namespace,
             view: Cell::new(0),
             log_view: Cell::new(0),
             status: Cell::new(Status::Recovering),
@@ -479,7 +489,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             do_view_change_quorum: Cell::new(false),
             sent_own_start_view_change: Cell::new(false),
             sent_own_do_view_change: Cell::new(false),
-            timeouts: RefCell::new(TimeoutManager::new(replica as u128)),
+            timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
         }
     }
 
@@ -489,7 +499,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     }
 
     pub fn primary_index(&self, view: u32) -> u8 {
-        view as u8 % self.replica_count
+        (view % self.replica_count as u32) as u8
     }
 
     pub fn is_primary(&self) -> bool {
@@ -563,6 +573,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.replica_count
     }
 
+    pub fn namespace(&self) -> u64 {
+        self.namespace
+    }
+
+    pub fn last_prepare_checksum(&self) -> u128 {
+        self.last_prepare_checksum.get()
+    }
+
+    pub fn set_last_prepare_checksum(&self, checksum: u128) {
+        self.last_prepare_checksum.set(checksum);
+    }
+
     pub fn log_view(&self) -> u32 {
         self.log_view.get()
     }
@@ -678,7 +700,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.start(TimeoutKind::ViewChangeStatus);
         }
 
-        vec![VsrAction::SendStartViewChange { view: new_view }]
+        vec![VsrAction::SendStartViewChange {
+            view: new_view,
+            namespace: self.namespace,
+        }]
     }
 
     /// Resend SVC message if we've started view change.
@@ -693,6 +718,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         vec![VsrAction::SendStartViewChange {
             view: self.view.get(),
+            namespace: self.namespace,
         }]
     }
 
@@ -725,6 +751,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             log_view: self.log_view.get(),
             op: current_op,
             commit: current_commit,
+            namespace: self.namespace,
         }]
     }
 
@@ -748,7 +775,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             .borrow_mut()
             .reset(TimeoutKind::ViewChangeStatus);
 
-        vec![VsrAction::SendStartViewChange { view: next_view }]
+        vec![VsrAction::SendStartViewChange {
+            view: next_view,
+            namespace: self.namespace,
+        }]
     }
 
     /// Handle a received StartViewChange message.
@@ -757,6 +787,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
     /// that will be the primary in the new view."
     pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> 
Vec<VsrAction> {
+        assert_eq!(
+            header.namespace, self.namespace,
+            "SVC routed to wrong group"
+        );
         let from_replica = header.replica;
         let msg_view = header.view;
 
@@ -786,7 +820,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             }
 
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+            actions.push(VsrAction::SendStartViewChange {
+                view: msg_view,
+                namespace: self.namespace,
+            });
         }
 
         // Record the SVC from sender
@@ -816,6 +853,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 log_view: self.log_view.get(),
                 op: current_op,
                 commit: current_commit,
+                namespace: self.namespace,
             });
 
             // If we are the primary candidate, record our own DVC
@@ -848,6 +886,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// replicas (including itself), it sets its view-number to that in the 
messages
     /// and selects as the new log the one contained in the message with the 
largest v'..."
     pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> 
Vec<VsrAction> {
+        assert_eq!(
+            header.namespace, self.namespace,
+            "DVC routed to wrong group"
+        );
         let from_replica = header.replica;
         let msg_view = header.view;
         let msg_log_view = header.log_view;
@@ -880,7 +922,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             }
 
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+            actions.push(VsrAction::SendStartViewChange {
+                view: msg_view,
+                namespace: self.namespace,
+            });
         }
 
         // Only the primary candidate processes DVCs for quorum
@@ -939,6 +984,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// in the log, set their view-number to the view number in the message, 
change
     /// their status to normal, and send PrepareOK for any uncommitted ops."
     pub fn handle_start_view(&self, header: &StartViewHeader) -> 
Vec<VsrAction> {
+        assert_eq!(header.namespace, self.namespace, "SV routed to wrong 
group");
         let from_replica = header.replica;
         let msg_view = header.view;
         let msg_op = header.op;
@@ -966,6 +1012,9 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.advance_commit_number(msg_commit);
         self.reset_view_change_state();
 
+        // Stale pipeline entries from the old view must be discarded
+        self.pipeline.borrow_mut().clear();
+
         // Update our op to match the new primary's log
         self.sequencer.set_sequence(msg_op);
 
@@ -984,7 +1033,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             actions.push(VsrAction::SendPrepareOk {
                 view: msg_view,
                 op: op_num,
-                target: from_replica, // Send to new primary
+                target: from_replica,
+                namespace: self.namespace,
             });
         }
 
@@ -1006,6 +1056,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.log_view.set(self.view.get());
         self.status.set(Status::Normal);
         self.advance_commit_number(max_commit);
+        self.sequencer.set_sequence(new_op);
+
+        // Stale pipeline entries from the old view are invalid in the new 
view.
+        // Log reconciliation replays from the journal, not the pipeline.
+        self.pipeline.borrow_mut().clear();
 
         // Update timeouts for normal primary operation
         {
@@ -1020,6 +1075,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             view: self.view.get(),
             op: new_op,
             commit: max_commit,
+            namespace: self.namespace,
         }]
     }
 
@@ -1107,7 +1163,7 @@ where
                 command: Command2::Prepare,
                 replica: consensus.replica,
                 client: old.client,
-                parent: 0, // TODO: Get parent checksum from the previous 
entry in the journal (figure out how to pass that ctx here)
+                parent: consensus.last_prepare_checksum(),
                 request_checksum: old.request_checksum,
                 request: old.request,
                 commit: consensus.commit.get(),
@@ -1184,37 +1240,6 @@ where
         pipeline.verify();
     }
 
-    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>) {
-        let header = message.header();
-
-        // verify the message belongs to our cluster
-        assert_eq!(header.cluster, self.cluster, "cluster mismatch");
-
-        // verify view is not from the future
-        assert!(
-            header.view <= self.view.get(),
-            "prepare view {} is ahead of replica view {}",
-            header.view,
-            self.view.get()
-        );
-
-        // verify op is sequential
-        assert_eq!(
-            header.op,
-            self.sequencer.current_sequence(),
-            "op must be sequential: expected {}, got {}",
-            self.sequencer.current_sequence(),
-            header.op
-        );
-
-        // verify hash chain
-        assert_eq!(
-            header.parent,
-            self.last_prepare_checksum.get(),
-            "parent checksum mismatch"
-        );
-    }
-
     fn is_follower(&self) -> bool {
         !self.is_primary()
     }
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 767ee9bf8..0ee87eab6 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -31,9 +31,6 @@ pub trait Pipeline {
 
     fn pop_message(&mut self) -> Option<Self::Entry>;
 
-    /// Extract and remove a message by op number.
-    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
-
     fn clear(&mut self);
 
     fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
@@ -42,6 +39,8 @@ pub trait Pipeline {
 
     fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry>;
 
+    fn head(&self) -> Option<&Self::Entry>;
+
     fn is_full(&self) -> bool;
 
     fn is_empty(&self) -> bool;
@@ -65,9 +64,6 @@ pub trait Consensus: Sized {
     fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>);
     fn verify_pipeline(&self);
 
-    // TODO: Figure out how we can achieve that without exposing such methods 
in the Consensus trait.
-    fn post_replicate_verify(&self, message: 
&Self::Message<Self::ReplicateHeader>);
-
     fn is_follower(&self) -> bool;
     fn is_normal(&self) -> bool;
     fn is_syncing(&self) -> bool;
@@ -97,6 +93,8 @@ where
 
 mod impls;
 pub use impls::*;
+mod namespaced_pipeline;
+pub use namespaced_pipeline::*;
 mod plane_helpers;
 pub use plane_helpers::*;
 
diff --git a/core/consensus/src/namespaced_pipeline.rs 
b/core/consensus/src/namespaced_pipeline.rs
new file mode 100644
index 000000000..05d47d80f
--- /dev/null
+++ b/core/consensus/src/namespaced_pipeline.rs
@@ -0,0 +1,540 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Pipeline;
+use crate::impls::{PIPELINE_PREPARE_QUEUE_MAX, PipelineEntry};
+use iggy_common::header::PrepareHeader;
+use iggy_common::message::Message;
+use std::collections::{HashMap, VecDeque};
+
+/// Pipeline that partitions entries by namespace for independent commit 
draining.
+///
+/// A single global op sequence and hash chain spans all namespaces, but 
entries
+/// are stored in per-namespace VecDeques. Each namespace tracks its own commit
+/// frontier so `drain_committable_all` drains quorum'd entries per-namespace
+/// without waiting for the global commit to advance past unrelated namespaces.
+///
+/// The global commit (on `VsrConsensus`) remains a conservative lower bound
+/// for the VSR protocol (view change, follower commit piggybacking). It only
+/// advances when all ops up to that point are drained. Per-namespace draining
+/// can run ahead of the global commit.
+///
+/// An alternative (simpler) approach would drain purely by per-entry quorum
+/// flag without tracking per-namespace commit numbers, relying solely on
+/// `global_commit_frontier` for the protocol commit. We track per-namespace
+/// commits explicitly for observability and to make the independence model
+/// visible in the data structure.
+#[derive(Debug)]
+pub struct NamespacedPipeline {
+    queues: HashMap<u64, VecDeque<PipelineEntry>>,
+    /// Per-namespace commit frontier: highest drained op per namespace.
+    pub(crate) ns_commits: HashMap<u64, u64>,
+    pub(crate) total_count: usize,
+    last_push_checksum: u128,
+    last_push_op: u64,
+    /// Lower bound of ops pushed to this pipeline instance.
+    /// Used by `global_commit_frontier` to distinguish "never pushed" from 
"drained."
+    first_push_op: u64,
+}
+
+impl Default for NamespacedPipeline {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl NamespacedPipeline {
+    pub fn new() -> Self {
+        Self {
+            queues: HashMap::new(),
+            ns_commits: HashMap::new(),
+            total_count: 0,
+            last_push_checksum: 0,
+            last_push_op: 0,
+            first_push_op: 0,
+        }
+    }
+
+    pub fn register_namespace(&mut self, ns: u64) {
+        self.queues.entry(ns).or_default();
+        self.ns_commits.entry(ns).or_insert(0);
+    }
+
+    /// Per-namespace commit frontier for the given namespace.
+    pub fn ns_commit(&self, ns: u64) -> Option<u64> {
+        self.ns_commits.get(&ns).copied()
+    }
+
+    /// Drain entries that have achieved quorum, independently per namespace.
+    ///
+    /// For each namespace queue, drains from the front while entries have
+    /// `ok_quorum_received == true`. Returns entries sorted by global op
+    /// for deterministic processing.
+    pub fn drain_committable_all(&mut self) -> Vec<PipelineEntry> {
+        let mut drained = Vec::new();
+
+        let Self {
+            queues,
+            ns_commits,
+            total_count,
+            ..
+        } = self;
+
+        for (ns, queue) in queues.iter_mut() {
+            while let Some(front) = queue.front() {
+                if !front.ok_quorum_received {
+                    break;
+                }
+                let entry = queue.pop_front().expect("front exists");
+                *total_count -= 1;
+                if let Some(ns_commit) = ns_commits.get_mut(ns) {
+                    *ns_commit = entry.header.op;
+                }
+                drained.push(entry);
+            }
+        }
+
+        drained.sort_by_key(|entry| entry.header.op);
+        drained
+    }
+
+    /// Compute the global commit frontier after draining.
+    ///
+    /// Walks forward from `current_commit + 1`, treating any op not found
+    /// in the pipeline (already drained) as committed. Stops at the first
+    /// op still present in a queue or past `last_push_op`.
+    pub fn global_commit_frontier(&self, current_commit: u64) -> u64 {
+        let mut commit = current_commit;
+        loop {
+            let next = commit + 1;
+            if next > self.last_push_op {
+                break;
+            }
+            // Ops below first_push_op were never in this pipeline instance
+            // and must not be mistaken for drained entries.
+            if next < self.first_push_op {
+                break;
+            }
+            // Still in a queue means not yet drained
+            if self.message_by_op(next).is_some() {
+                break;
+            }
+            commit = next;
+        }
+        commit
+    }
+}
+
+impl Pipeline for NamespacedPipeline {
+    type Message = Message<PrepareHeader>;
+    type Entry = PipelineEntry;
+
+    fn push_message(&mut self, message: Self::Message) {
+        assert!(
+            self.total_count < PIPELINE_PREPARE_QUEUE_MAX,
+            "namespaced pipeline full"
+        );
+
+        let header = *message.header();
+        let ns = header.namespace;
+
+        if self.total_count > 0 {
+            assert_eq!(
+                header.op,
+                self.last_push_op + 1,
+                "global ops must be sequential: expected {}, got {}",
+                self.last_push_op + 1,
+                header.op
+            );
+            assert_eq!(
+                header.parent, self.last_push_checksum,
+                "parent must chain to previous global checksum"
+            );
+        } else {
+            self.first_push_op = header.op;
+        }
+
+        let queue = self
+            .queues
+            .get_mut(&ns)
+            .expect("push_message: namespace not registered");
+        if let Some(tail) = queue.back() {
+            assert!(
+                header.op > tail.header.op,
+                "op must increase within namespace queue"
+            );
+        }
+
+        queue.push_back(PipelineEntry::new(header));
+        self.total_count += 1;
+        self.last_push_checksum = header.checksum;
+        self.last_push_op = header.op;
+    }
+
+    fn pop_message(&mut self) -> Option<Self::Entry> {
+        let min_ns = self
+            .queues
+            .iter()
+            .filter_map(|(ns, q)| q.front().map(|entry| (*ns, 
entry.header.op)))
+            .min_by_key(|(_, op)| *op)
+            .map(|(ns, _)| ns)?;
+
+        let entry = self.queues.get_mut(&min_ns)?.pop_front()?;
+        self.total_count -= 1;
+        Some(entry)
+    }
+
+    fn clear(&mut self) {
+        for queue in self.queues.values_mut() {
+            queue.clear();
+        }
+        self.total_count = 0;
+        self.last_push_checksum = 0;
+        self.last_push_op = 0;
+        self.first_push_op = 0;
+    }
+
+    /// Linear scan all queues. Ops are globally unique; max 8 entries total.
+    fn message_by_op(&self, op: u64) -> Option<&Self::Entry> {
+        for queue in self.queues.values() {
+            for entry in queue {
+                if entry.header.op == op {
+                    return Some(entry);
+                }
+            }
+        }
+        None
+    }
+
+    fn message_by_op_mut(&mut self, op: u64) -> Option<&mut Self::Entry> {
+        for queue in self.queues.values_mut() {
+            for entry in queue.iter_mut() {
+                if entry.header.op == op {
+                    return Some(entry);
+                }
+            }
+        }
+        None
+    }
+
+    fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry> {
+        let entry = self.message_by_op(op)?;
+        if entry.header.checksum == checksum {
+            Some(entry)
+        } else {
+            None
+        }
+    }
+
+    fn head(&self) -> Option<&Self::Entry> {
+        self.queues
+            .values()
+            .filter_map(|q| q.front())
+            .min_by_key(|entry| entry.header.op)
+    }
+
+    fn is_full(&self) -> bool {
+        self.total_count >= PIPELINE_PREPARE_QUEUE_MAX
+    }
+
+    fn is_empty(&self) -> bool {
+        self.total_count == 0
+    }
+
+    fn verify(&self) {
+        assert!(self.total_count <= PIPELINE_PREPARE_QUEUE_MAX);
+
+        let actual_count: usize = self.queues.values().map(|q| q.len()).sum();
+        assert_eq!(actual_count, self.total_count, "total_count mismatch");
+
+        // Per-namespace: ops must be monotonically increasing
+        for queue in self.queues.values() {
+            let mut prev_op = None;
+            for entry in queue {
+                if let Some(prev) = prev_op {
+                    assert!(
+                        entry.header.op > prev,
+                        "ops must increase within namespace queue"
+                    );
+                }
+                prev_op = Some(entry.header.op);
+            }
+        }
+
+        // Global: collect all entries, sort by op, verify sequential ops and 
hash chain
+        let mut all_entries: Vec<&PipelineEntry> =
+            self.queues.values().flat_map(|q| q.iter()).collect();
+        all_entries.sort_by_key(|e| e.header.op);
+
+        for window in all_entries.windows(2) {
+            let prev = &window[0].header;
+            let curr = &window[1].header;
+            assert_eq!(
+                curr.op,
+                prev.op + 1,
+                "global ops must be sequential: {} -> {}",
+                prev.op,
+                curr.op
+            );
+            assert_eq!(
+                curr.parent, prev.checksum,
+                "global hash chain broken at op {}: parent={} expected={}",
+                curr.op, curr.parent, prev.checksum
+            );
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_common::header::Command2;
+
+    fn make_prepare(
+        op: u64,
+        parent: u128,
+        checksum: u128,
+        namespace: u64,
+    ) -> Message<PrepareHeader> {
+        
Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header(
+            |_, new| {
+                *new = PrepareHeader {
+                    command: Command2::Prepare,
+                    op,
+                    parent,
+                    checksum,
+                    namespace,
+                    ..Default::default()
+                };
+            },
+        )
+    }
+
+    fn mark_quorum(pipeline: &mut NamespacedPipeline, op: u64) {
+        pipeline
+            .message_by_op_mut(op)
+            .expect("mark_quorum: op not in pipeline")
+            .ok_quorum_received = true;
+    }
+
+    #[test]
+    fn multi_namespace_push_pop() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 100));
+        pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+        assert_eq!(pipeline.total_count, 4);
+        assert!(!pipeline.is_empty());
+
+        // head is the entry with the smallest op
+        assert_eq!(pipeline.head().unwrap().header.op, 1);
+
+        // pop returns entries in global op order
+        assert_eq!(pipeline.pop_message().unwrap().header.op, 1);
+        assert_eq!(pipeline.pop_message().unwrap().header.op, 2);
+        assert_eq!(pipeline.pop_message().unwrap().header.op, 3);
+        assert_eq!(pipeline.pop_message().unwrap().header.op, 4);
+        assert!(pipeline.is_empty());
+    }
+
+    #[test]
+    fn drain_committable_all() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+
+        // Interleaved ops across two namespaces: [ns_a:1, ns_b:2, ns_a:3, 
ns_b:4]
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 100));
+        pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+        // Mark ops 1,2,3 as quorum'd (not 4)
+        mark_quorum(&mut pipeline, 1);
+        mark_quorum(&mut pipeline, 2);
+        mark_quorum(&mut pipeline, 3);
+
+        // ns_100 drains [1,3], ns_200 drains [2] (stops at non-quorum'd 4)
+        let drained = pipeline.drain_committable_all();
+        let drained_ops: Vec<_> = drained.iter().map(|e| 
e.header.op).collect();
+        assert_eq!(drained_ops, vec![1, 2, 3]);
+
+        assert_eq!(pipeline.total_count, 1);
+        assert_eq!(pipeline.head().unwrap().header.op, 4);
+
+        // Per-namespace commits track highest drained op
+        assert_eq!(pipeline.ns_commit(100), Some(3));
+        assert_eq!(pipeline.ns_commit(200), Some(2));
+
+        // Global commit advances past contiguously drained ops
+        assert_eq!(pipeline.global_commit_frontier(0), 3);
+    }
+
+    #[test]
+    fn drain_committable_all_full() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 100));
+        pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+        mark_quorum(&mut pipeline, 1);
+        mark_quorum(&mut pipeline, 2);
+        mark_quorum(&mut pipeline, 3);
+        mark_quorum(&mut pipeline, 4);
+
+        let drained = pipeline.drain_committable_all();
+        assert_eq!(drained.len(), 4);
+        assert!(pipeline.is_empty());
+        assert_eq!(pipeline.global_commit_frontier(0), 4);
+    }
+
+    #[test]
+    fn independent_namespace_progress() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+
+        // [ns_a:1, ns_b:2, ns_a:3, ns_b:4]
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 100));
+        pipeline.push_message(make_prepare(4, 30, 40, 200));
+
+        // Only ns_a (ops 1,3) gets quorum, ns_b (ops 2,4) does not
+        mark_quorum(&mut pipeline, 1);
+        mark_quorum(&mut pipeline, 3);
+
+        let drained = pipeline.drain_committable_all();
+        let drained_ops: Vec<_> = drained.iter().map(|e| 
e.header.op).collect();
+        assert_eq!(drained_ops, vec![1, 3]);
+
+        // ns_a progressed independently, ns_b untouched
+        assert_eq!(pipeline.ns_commit(100), Some(3));
+        assert_eq!(pipeline.ns_commit(200), Some(0));
+        assert_eq!(pipeline.total_count, 2);
+
+        // Global commit only advances to 1 (can't skip ns_b's op 2)
+        assert_eq!(pipeline.global_commit_frontier(0), 1);
+
+        // Now ns_b gets quorum
+        mark_quorum(&mut pipeline, 2);
+        mark_quorum(&mut pipeline, 4);
+
+        let drained = pipeline.drain_committable_all();
+        let drained_ops: Vec<_> = drained.iter().map(|e| 
e.header.op).collect();
+        assert_eq!(drained_ops, vec![2, 4]);
+
+        // Global commit jumps to 4 (ops 1,3 already drained, 2,4 just drained)
+        assert_eq!(pipeline.global_commit_frontier(1), 4);
+        assert_eq!(pipeline.ns_commit(200), Some(4));
+    }
+
+    #[test]
+    fn message_by_op_cross_namespace() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+        pipeline.register_namespace(300);
+
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 300));
+
+        assert_eq!(pipeline.message_by_op(1).unwrap().header.namespace, 100);
+        assert_eq!(pipeline.message_by_op(2).unwrap().header.namespace, 200);
+        assert_eq!(pipeline.message_by_op(3).unwrap().header.namespace, 300);
+        assert!(pipeline.message_by_op(4).is_none());
+    }
+
+    #[test]
+    fn message_by_op_and_checksum() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+
+        assert!(pipeline.message_by_op_and_checksum(1, 10).is_some());
+        assert!(pipeline.message_by_op_and_checksum(1, 99).is_none());
+        assert!(pipeline.message_by_op_and_checksum(2, 10).is_none());
+    }
+
+    #[test]
+    fn verify_passes() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+        pipeline.push_message(make_prepare(3, 20, 30, 100));
+        pipeline.verify();
+    }
+
+    #[test]
+    fn is_full() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(0);
+        pipeline.register_namespace(1);
+        for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 {
+            let parent = if i == 0 { 0 } else { i * 10 };
+            let checksum = (i + 1) * 10;
+            pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum, 
i as u64 % 2));
+        }
+        assert!(pipeline.is_full());
+    }
+
+    #[test]
+    #[should_panic(expected = "namespaced pipeline full")]
+    fn push_when_full_panics() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(0);
+        for i in 0..PIPELINE_PREPARE_QUEUE_MAX as u128 {
+            let parent = if i == 0 { 0 } else { i * 10 };
+            let checksum = (i + 1) * 10;
+            pipeline.push_message(make_prepare(i as u64 + 1, parent, checksum, 
0));
+        }
+        pipeline.push_message(make_prepare(100, 80, 1000, 0));
+    }
+
+    #[test]
+    fn clear_preserves_ns_commits() {
+        let mut pipeline = NamespacedPipeline::new();
+        pipeline.register_namespace(100);
+        pipeline.register_namespace(200);
+        pipeline.push_message(make_prepare(1, 0, 10, 100));
+        pipeline.push_message(make_prepare(2, 10, 20, 200));
+
+        // Mark op 1 as committed in ns 100 before clearing
+        pipeline.ns_commits.insert(100, 1);
+
+        pipeline.clear();
+        assert!(pipeline.is_empty());
+        assert_eq!(pipeline.total_count, 0);
+
+        // ns_commits must survive clear -- they represent durable knowledge
+        // about already-drained ops, not pipeline state
+        assert_eq!(pipeline.ns_commits.get(&100), Some(&1));
+        assert_eq!(pipeline.ns_commits.get(&200), Some(&0));
+    }
+}
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 4b3986067..2307accb1 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -40,7 +40,6 @@ pub async fn pipeline_prepare_common<C, F>(
     consensus.verify_pipeline();
     consensus.pipeline_message(prepare.clone());
     on_replicate(prepare.clone()).await;
-    consensus.post_replicate_verify(&prepare);
 }
 
 /// Shared commit-based old-prepare fence.
@@ -77,6 +76,7 @@ pub async fn replicate_to_next_in_chain<B, P>(
 
     assert_ne!(next, consensus.replica());
 
+    // TODO: Propagate send error instead of panicking; requires bus error 
design.
     consensus
         .message_bus()
         .send_to_replica(next, message.into_generic())
@@ -135,7 +135,11 @@ where
     Ok(())
 }
 
-/// Shared quorum + extraction flow for ack handling.
+/// Shared quorum tracking flow for ack handling.
+///
+/// After recording the ack, walks forward from `current_commit + 1` advancing
+/// the commit number only while consecutive ops have achieved quorum. This
+/// prevents committing ops that have gaps in quorum acknowledgment.
 pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: 
&PrepareOkHeader) -> bool
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -145,8 +149,49 @@ where
         return false;
     }
 
-    consensus.advance_commit_number(ack.op);
-    true
+    let pipeline = consensus.pipeline().borrow();
+    let mut new_commit = consensus.commit();
+    while let Some(entry) = pipeline.message_by_op(new_commit + 1) {
+        if !entry.ok_quorum_received {
+            break;
+        }
+        new_commit += 1;
+    }
+    drop(pipeline);
+
+    if new_commit > consensus.commit() {
+        consensus.advance_commit_number(new_commit);
+        return true;
+    }
+
+    false
+}
+
+/// Drain and return committable prepares from the pipeline head.
+///
+/// Entries are drained only from the head and only while their op is covered
+/// by the current commit frontier.
+pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> 
Vec<PipelineEntry>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    let commit = consensus.commit();
+    let mut drained = Vec::new();
+    let mut pipeline = consensus.pipeline().borrow_mut();
+
+    while let Some(head_op) = pipeline.head().map(|entry| entry.header.op) {
+        if head_op > commit {
+            break;
+        }
+
+        let entry = pipeline
+            .pop_message()
+            .expect("drain_committable_prefix: head exists");
+        drained.push(entry);
+    }
+
+    drained
 }
 
 /// Shared reply-message construction for committed prepare.
@@ -176,6 +221,7 @@ where
             timestamp: prepare_header.timestamp,
             request: prepare_header.request,
             operation: prepare_header.operation,
+            namespace: prepare_header.namespace,
             ..Default::default()
         };
     })
@@ -253,6 +299,7 @@ pub async fn send_prepare_ok<B, P>(
     let generic_message = message.into_generic();
     let primary = consensus.primary_index(consensus.view());
 
+    // TODO: Propagate send errors instead of panicking; requires bus error 
design.
     if primary == consensus.replica() {
         // TODO: Queue for self-processing or call handle_prepare_ok directly.
         // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
@@ -269,3 +316,107 @@ pub async fn send_prepare_ok<B, P>(
             .unwrap();
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{Consensus, LocalPipeline};
+    use iggy_common::IggyError;
+
+    #[derive(Debug, Default)]
+    struct NoopBus;
+
+    impl MessageBus for NoopBus {
+        type Client = u128;
+        type Replica = u8;
+        type Data = Message<GenericHeader>;
+        type Sender = ();
+
+        fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender) 
-> bool {
+            true
+        }
+
+        fn remove_client(&mut self, _client: Self::Client) -> bool {
+            true
+        }
+
+        fn add_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+
+        fn remove_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+
+        async fn send_to_client(
+            &self,
+            _client_id: Self::Client,
+            _data: Self::Data,
+        ) -> Result<(), IggyError> {
+            Ok(())
+        }
+
+        async fn send_to_replica(
+            &self,
+            _replica: Self::Replica,
+            _data: Self::Data,
+        ) -> Result<(), IggyError> {
+            Ok(())
+        }
+    }
+
+    fn prepare_message(op: u64, parent: u128, checksum: u128) -> 
Message<PrepareHeader> {
+        
Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header(
+            |_, new| {
+                *new = PrepareHeader {
+                    command: Command2::Prepare,
+                    op,
+                    parent,
+                    checksum,
+                    ..Default::default()
+                };
+            },
+        )
+    }
+
+    #[test]
+    fn drains_head_prefix_by_commit_frontier() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        consensus.pipeline_message(prepare_message(1, 0, 10));
+        consensus.pipeline_message(prepare_message(2, 10, 20));
+        consensus.pipeline_message(prepare_message(3, 20, 30));
+
+        consensus.advance_commit_number(3);
+
+        let drained = drain_committable_prefix(&consensus);
+        let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
+        assert_eq!(drained_ops, vec![1, 2, 3]);
+        assert!(consensus.pipeline().borrow().is_empty());
+    }
+
+    #[test]
+    fn drains_only_up_to_commit_frontier_even_without_quorum_flags() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        consensus.pipeline_message(prepare_message(5, 0, 50));
+        consensus.pipeline_message(prepare_message(6, 50, 60));
+        consensus.pipeline_message(prepare_message(7, 60, 70));
+
+        consensus.advance_commit_number(6);
+        let drained = drain_committable_prefix(&consensus);
+        let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
+
+        assert_eq!(drained_ops, vec![5, 6]);
+        assert_eq!(
+            consensus
+                .pipeline()
+                .borrow()
+                .head()
+                .map(|entry| entry.header.op),
+            Some(7)
+        );
+    }
+}
diff --git a/core/consensus/src/vsr_timeout.rs 
b/core/consensus/src/vsr_timeout.rs
index de08a44dd..f301ec7d2 100644
--- a/core/consensus/src/vsr_timeout.rs
+++ b/core/consensus/src/vsr_timeout.rs
@@ -166,6 +166,7 @@ impl TimeoutManager {
         self.start_view_change_message.tick();
         self.do_view_change_message.tick();
         self.request_start_view_message.tick();
+        self.view_change_status.tick();
     }
 
     pub fn fired(&self, kind: TimeoutKind) -> bool {
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 4ff74f4e2..35386b182 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -18,7 +18,7 @@ use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
     Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+    ack_quorum_reached, build_reply_message, drain_committable_prefix, 
fence_old_prepare_by_commit,
     panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
     replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
@@ -156,6 +156,7 @@ where
         assert_eq!(header.op, current_op + 1);
 
         consensus.sequencer().set_sequence(header.op);
+        consensus.set_last_prepare_checksum(header.checksum);
 
         // Append to journal.
         journal.handle().append(message.clone()).await;
@@ -194,47 +195,51 @@ where
 
             debug!("on_ack: quorum received for op={}", header.op);
 
-            // Extract the header from the pipeline, fetch the full message 
from journal
-            // TODO: Commit from the head. ALWAYS
-            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(entry) = entry else {
-                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
-                return;
-            };
-
-            let prepare_header = entry.header;
-            // TODO(hubcio): should we replace this with graceful fallback 
(warn + return)?
-            // When journal compaction is implemented compaction could race
-            // with this lookup if it removes entries below the commit number.
-            let prepare = journal
-                .handle()
-                .entry(&prepare_header)
-                .await
-                .unwrap_or_else(|| {
-                    panic!(
-                        "on_ack: committed prepare op={} checksum={} must be 
in journal",
-                        prepare_header.op, prepare_header.checksum
-                    )
-                });
-
-            // Apply the state (consumes prepare)
-            // TODO: Handle appending result to response
-            let _result = self.mux_stm.update(prepare);
-            debug!("on_ack: state applied for op={}", prepare_header.op);
-
-            // Send reply to client
-            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
-            debug!(
-                "on_ack: sending reply to client={} for op={}",
-                prepare_header.client, prepare_header.op
-            );
-
-            // TODO: Error handling
-            consensus
-                .message_bus()
-                .send_to_client(prepare_header.client, generic_reply)
-                .await
-                .unwrap()
+            let drained = drain_committable_prefix(consensus);
+            if let (Some(first), Some(last)) = (drained.first(), 
drained.last()) {
+                debug!(
+                    "on_ack: draining committed prefix ops=[{}..={}] count={}",
+                    first.header.op,
+                    last.header.op,
+                    drained.len()
+                );
+            }
+
+            for entry in drained {
+                let prepare_header = entry.header;
+                // TODO(hubcio): should we replace this with graceful fallback 
(warn + return)?
+                // When journal compaction is implemented compaction could race
+                // with this lookup if it removes entries below the commit 
number.
+                let prepare = journal
+                    .handle()
+                    .entry(&prepare_header)
+                    .await
+                    .unwrap_or_else(|| {
+                        panic!(
+                            "on_ack: committed prepare op={} checksum={} must 
be in journal",
+                            prepare_header.op, prepare_header.checksum
+                        )
+                    });
+
+                // Apply the state (consumes prepare)
+                // TODO: Handle appending result to response
+                let _result = self.mux_stm.update(prepare);
+                debug!("on_ack: state applied for op={}", prepare_header.op);
+
+                // Send reply to client
+                let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
+                debug!(
+                    "on_ack: sending reply to client={} for op={}",
+                    prepare_header.client, prepare_header.op
+                );
+
+                // TODO: Propagate send error instead of panicking; requires 
bus error design.
+                consensus
+                    .message_bus()
+                    .send_to_client(prepare_header.client, generic_reply)
+                    .await
+                    .unwrap()
+            }
         }
     }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 16ed124ab..2c646072d 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -21,9 +21,10 @@ use crate::IggyPartition;
 use crate::Partition;
 use crate::types::PartitionsConfig;
 use consensus::{
-    Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
-    replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as 
send_prepare_ok_common,
+    Consensus, NamespacedPipeline, Pipeline, PipelineEntry, Plane, Project, 
Sequencer,
+    VsrConsensus, ack_preflight, build_reply_message, 
fence_old_prepare_by_commit,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+    send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
@@ -34,7 +35,7 @@ use iggy_common::{
 };
 use message_bus::MessageBus;
 use std::cell::UnsafeCell;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use std::sync::atomic::Ordering;
 use tracing::{debug, warn};
@@ -58,33 +59,27 @@ pub struct IggyPartitions<C> {
     /// mutate partition state (segments, offsets, journal).
     partitions: UnsafeCell<Vec<IggyPartition>>,
     namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
-    /// Some on shard0, None on other shards
-    pub consensus: Option<C>,
+    consensus: Option<C>,
 }
 
 impl<C> IggyPartitions<C> {
-    pub fn new(shard_id: ShardId, config: PartitionsConfig, consensus: 
Option<C>) -> Self {
+    pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
         Self {
             shard_id,
             config,
             partitions: UnsafeCell::new(Vec::new()),
             namespace_to_local: HashMap::new(),
-            consensus,
+            consensus: None,
         }
     }
 
-    pub fn with_capacity(
-        shard_id: ShardId,
-        config: PartitionsConfig,
-        consensus: Option<C>,
-        capacity: usize,
-    ) -> Self {
+    pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig, 
capacity: usize) -> Self {
         Self {
             shard_id,
             config,
             partitions: UnsafeCell::new(Vec::with_capacity(capacity)),
             namespace_to_local: HashMap::with_capacity(capacity),
-            consensus,
+            consensus: None,
         }
     }
 
@@ -231,16 +226,23 @@ impl<C> IggyPartitions<C> {
         &mut self.partitions_mut()[idx]
     }
 
+    pub fn consensus(&self) -> Option<&C> {
+        self.consensus.as_ref()
+    }
+
+    pub fn set_consensus(&mut self, consensus: C) {
+        self.consensus = Some(consensus);
+    }
+
     /// Initialize a new partition with in-memory storage (for 
testing/simulation).
     ///
-    /// This is a simplified version that doesn't create file-backed storage.
-    /// Use `init_partition()` for production use with real files.
+    /// Idempotent: subsequent calls for the same namespace are no-ops 
returning
+    /// the existing index. Consensus must be set separately via 
`set_consensus`.
     ///
     /// TODO: Make the log generic over its storage backend to support both
     /// in-memory (for testing) and file-backed (for production) storage 
without
     /// needing separate initialization methods.
     pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
-        // Check if already initialized
         if let Some(idx) = self.local_idx(&namespace) {
             return idx;
         }
@@ -261,7 +263,6 @@ impl<C> IggyPartitions<C> {
         partition.should_increment_offset = false;
         partition.stats.increment_segments_count(1);
 
-        // Insert and return local index
         self.insert(namespace, partition)
     }
 
@@ -275,9 +276,9 @@ impl<C> IggyPartitions<C> {
     /// 2. Control plane: broadcast to shards (SKIPPED in this method)
     /// 3. Data plane: INITIATE PARTITION (THIS METHOD)
     ///
-    /// Idempotent - returns existing LocalIdx if partition already exists.
+    /// Idempotent: subsequent calls for the same namespace are no-ops.
+    /// Consensus must be set separately via `set_consensus`.
     pub async fn init_partition(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
-        // Check if already initialized
         if let Some(idx) = self.local_idx(&namespace) {
             return idx;
         }
@@ -324,27 +325,38 @@ impl<C> IggyPartitions<C> {
         partition.should_increment_offset = false;
         partition.stats.increment_segments_count(1);
 
-        // Insert and return local index
         self.insert(namespace, partition)
     }
 }
 
-impl<B> Plane<VsrConsensus<B>> for IggyPartitions<VsrConsensus<B>>
+impl<B> Plane<VsrConsensus<B, NamespacedPipeline>>
+    for IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::Message<RequestHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
+    async fn on_request(
+        &self,
+        message: <VsrConsensus<B, NamespacedPipeline> as 
Consensus>::Message<RequestHeader>,
+    ) {
+        let namespace = IggyNamespace::from_raw(message.header().namespace);
+        let consensus = self
+            .consensus()
+            .expect("on_request: consensus not initialized");
 
-        debug!("handling partition request");
+        debug!(?namespace, "handling partition request");
         let prepare = message.project(consensus);
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
-
+    async fn on_replicate(
+        &self,
+        message: <VsrConsensus<B, NamespacedPipeline> as 
Consensus>::Message<PrepareHeader>,
+    ) {
         let header = message.header();
+        let namespace = IggyNamespace::from_raw(header.namespace);
+        let consensus = self
+            .consensus()
+            .expect("on_replicate: consensus not initialized");
 
         let current_op = match replicate_preflight(consensus, header) {
             Ok(current_op) => current_op,
@@ -367,25 +379,26 @@ where
         // TODO: Make those assertions be toggleable through an feature flag, 
so they can be used only by simulator/tests.
         debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
+        consensus.set_last_prepare_checksum(header.checksum);
 
         // TODO: Figure out the flow of the partition operations.
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        let namespace = IggyNamespace::from_raw(header.namespace);
-        self.apply_replicated_operation(&message, &namespace).await;
+        self.apply_replicated_operation(&namespace, &message).await;
 
-        // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
 
-        // If follower, commit any newly committable entries.
         if consensus.is_follower() {
-            self.commit_journal();
+            self.commit_journal(&namespace);
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
+    async fn on_ack(
+        &self,
+        message: <VsrConsensus<B, NamespacedPipeline> as 
Consensus>::Message<PrepareOkHeader>,
+    ) {
         let header = message.header();
+        let consensus = self.consensus().expect("on_ack: consensus not 
initialized");
 
         if let Err(reason) = ack_preflight(consensus) {
             warn!("on_ack: ignoring ({reason})");
@@ -394,42 +407,74 @@ where
 
         {
             let pipeline = consensus.pipeline().borrow();
-            let Some(entry) =
-                pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
-            else {
+            if pipeline
+                .message_by_op_and_checksum(header.op, header.prepare_checksum)
+                .is_none()
+            {
                 debug!("on_ack: prepare not in pipeline op={}", header.op);
                 return;
-            };
-
-            if entry.header.checksum != header.prepare_checksum {
-                warn!("on_ack: checksum mismatch");
-                return;
             }
         }
 
-        if ack_quorum_reached(consensus, header) {
-            debug!("on_ack: quorum received for op={}", header.op);
-
-            // Extract the prepare message from the pipeline by op
-            // TODO: Commit from the head. ALWAYS
-            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(PipelineEntry {
-                header: prepare_header,
-                ..
-            }) = entry
-            else {
-                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
-                return;
-            };
+        consensus.handle_prepare_ok(header);
+
+        // SAFETY(IGGY-66): Per-namespace drain independent of global commit.
+        //
+        // drain_committable_all() drains each namespace queue independently by
+        // quorum flag, so ns_a ops can be drained and replied to clients while
+        // ns_b ops block the global commit (e.g., ns_a ops 1,3 drain while
+        // ns_b op 2 is pending). This is intentional for partition 
independence.
+        //
+        // View change risk: if a view change occurs before the global commit
+        // covers a drained op, the new primary replays from max_commit+1 and
+        // re-executes it. append_messages is NOT idempotent -- re-execution
+        // produces duplicate partition data.
+        //
+        // Before this path handles real traffic, two guards are required:
+        //   1. Op-based dedup in apply_replicated_operation: skip append if
+        //      the partition journal already contains data for this op.
+        //   2. Client reply dedup by (client_id, request_id): prevent
+        //      duplicate replies after view change re-execution.
+        let drained = {
+            let mut pipeline = consensus.pipeline().borrow_mut();
+            pipeline.drain_committable_all()
+        };
+
+        if drained.is_empty() {
+            return;
+        }
+
+        // Advance global commit for VSR protocol correctness
+        {
+            let pipeline = consensus.pipeline().borrow();
+            let new_commit = 
pipeline.global_commit_frontier(consensus.commit());
+            drop(pipeline);
+            consensus.advance_commit_number(new_commit);
+        }
+
+        if let (Some(first), Some(last)) = (drained.first(), drained.last()) {
+            debug!(
+                "on_ack: draining committed ops=[{}..={}] count={}",
+                first.header.op,
+                last.header.op,
+                drained.len()
+            );
+        }
 
-            // Data was already appended to the partition journal during
-            // on_replicate. Now that quorum is reached, update the partition's
-            // current offset and check whether the journal needs flushing.
-            let namespace = IggyNamespace::from_raw(prepare_header.namespace);
+        let mut committed_ns: HashSet<IggyNamespace> = HashSet::new();
+
+        for PipelineEntry {
+            header: prepare_header,
+            ..
+        } in drained
+        {
+            let entry_namespace = 
IggyNamespace::from_raw(prepare_header.namespace);
 
             match prepare_header.operation {
                 Operation::SendMessages => {
-                    self.commit_messages(&namespace).await;
+                    if committed_ns.insert(entry_namespace) {
+                        self.commit_messages(&entry_namespace).await;
+                    }
                     debug!("on_ack: messages committed for op={}", 
prepare_header.op,);
                 }
                 Operation::StoreConsumerOffset => {
@@ -447,14 +492,13 @@ where
                 }
             }
 
-            // Send reply to client
             let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
             debug!(
                 "on_ack: sending reply to client={} for op={}",
                 prepare_header.client, prepare_header.op
             );
 
-            // TODO: Error handling
+            // TODO: Propagate send error instead of panicking; requires bus 
error design.
             consensus
                 .message_bus()
                 .send_to_client(prepare_header.client, generic_reply)
@@ -464,10 +508,18 @@ where
     }
 }
 
-impl<B> IggyPartitions<VsrConsensus<B>>
+impl<B> IggyPartitions<VsrConsensus<B, NamespacedPipeline>>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
+    pub fn register_namespace_in_pipeline(&self, ns: u64) {
+        self.consensus()
+            .expect("register_namespace_in_pipeline: consensus not 
initialized")
+            .pipeline()
+            .borrow_mut()
+            .register_namespace(ns);
+    }
+
     // TODO: Move this elsewhere, also do not reallocate, we do reallocationg 
now becauise we use PooledBuffer for the batch body
     // but `Bytes` for `Message` payload.
     fn batch_from_body(body: &[u8]) -> IggyMessagesBatchMut {
@@ -488,10 +540,12 @@ where
 
     async fn apply_replicated_operation(
         &self,
-        message: &Message<PrepareHeader>,
         namespace: &IggyNamespace,
+        message: &Message<PrepareHeader>,
     ) {
-        let consensus = self.consensus.as_ref().unwrap();
+        let consensus = self
+            .consensus()
+            .expect("apply_replicated_operation: consensus not initialized");
         let header = message.header();
 
         match header.operation {
@@ -552,16 +606,16 @@ where
 
     /// Replicate a prepare message to the next replica in the chain.
     ///
-    /// Chain replication pattern:
-    /// - Primary sends to first backup
-    /// - Each backup forwards to the next
-    /// - Stops when we would forward back to primary
+    /// Chain replication: primary -> first backup -> ... -> last backup.
+    /// Stops when the next replica would be the primary.
     async fn replicate(&self, message: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
+        let consensus = self
+            .consensus()
+            .expect("replicate: consensus not initialized");
         replicate_to_next_in_chain(consensus, message).await;
     }
 
-    fn commit_journal(&self) {
+    fn commit_journal(&self, _namespace: &IggyNamespace) {
         // TODO: Implement commit logic for followers.
         // Walk through journal from last committed to current commit number
         // Apply each entry to the partition state
@@ -787,7 +841,9 @@ where
     }
 
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
-        let consensus = self.consensus.as_ref().unwrap();
+        let consensus = self
+            .consensus()
+            .expect("send_prepare_ok: consensus not initialized");
         // TODO: Verify the prepare is persisted in the partition journal.
         // The partition journal uses MessageLookup headers, so we cannot
         // check by PrepareHeader.op directly. For now, skip this check.
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 25175ea8e..c4afc1bcf 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -17,7 +17,7 @@
 
 use crate::bus::SharedMemBus;
 use bytes::Bytes;
-use consensus::VsrConsensus;
+use consensus::{NamespacedPipeline, VsrConsensus};
 use iggy_common::header::PrepareHeader;
 use iggy_common::message::Message;
 use journal::{Journal, JournalHandle, Storage};
@@ -61,6 +61,7 @@ pub struct SimJournal<S: Storage> {
     storage: S,
     headers: UnsafeCell<HashMap<u64, PrepareHeader>>,
     offsets: UnsafeCell<HashMap<u64, usize>>,
+    write_offset: Cell<usize>,
 }
 
 impl<S: Storage + Default> Default for SimJournal<S> {
@@ -69,6 +70,7 @@ impl<S: Storage + Default> Default for SimJournal<S> {
             storage: S::default(),
             headers: UnsafeCell::new(HashMap::new()),
             offsets: UnsafeCell::new(HashMap::new()),
+            write_offset: Cell::new(0),
         }
     }
 }
@@ -79,6 +81,7 @@ impl<S: Storage> std::fmt::Debug for SimJournal<S> {
             .field("storage", &"<Storage>")
             .field("headers", &"<UnsafeCell>")
             .field("offsets", &"<UnsafeCell>")
+            .field("write_offset", &self.write_offset.get())
             .finish()
     }
 }
@@ -121,14 +124,10 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
 
         let bytes_written = self.storage.write(message_bytes.to_vec()).await;
 
-        let current_offset = unsafe { &mut *self.offsets.get() }
-            .values()
-            .last()
-            .cloned()
-            .unwrap_or_default();
-
+        let offset = self.write_offset.get();
         unsafe { &mut *self.headers.get() }.insert(header.op, header);
-        unsafe { &mut *self.offsets.get() }.insert(header.op, current_offset + 
bytes_written);
+        unsafe { &mut *self.offsets.get() }.insert(header.op, offset);
+        self.write_offset.set(offset + bytes_written);
     }
 
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
@@ -159,4 +158,5 @@ pub type SimMetadata = IggyMetadata<
 >;
 
 /// Type alias for simulator partitions
-pub type ReplicaPartitions = 
partitions::IggyPartitions<VsrConsensus<SharedMemBus>>;
+pub type ReplicaPartitions =
+    partitions::IggyPartitions<VsrConsensus<SharedMemBus, NamespacedPipeline>>;
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 44e063f78..332b7fc31 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -22,7 +22,7 @@ pub mod replica;
 
 use bus::MemBus;
 use consensus::Plane;
-use iggy_common::header::{GenericHeader, ReplyHeader};
+use iggy_common::header::{GenericHeader, Operation, ReplyHeader};
 use iggy_common::message::{Message, MessageBag};
 use message_bus::MessageBus;
 use replica::Replica;
@@ -34,10 +34,10 @@ pub struct Simulator {
 }
 
 impl Simulator {
-    /// Initialize a partition on all replicas (in-memory for simulation)
+    /// Initialize a partition with its own consensus group on all replicas.
     pub fn init_partition(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
         for replica in &mut self.replicas {
-            replica.partitions.init_partition_in_memory(namespace);
+            replica.init_partition(namespace);
         }
     }
 
@@ -120,13 +120,16 @@ impl Simulator {
             MessageBag::Request(message) => message.header().operation,
             MessageBag::Prepare(message) => message.header().operation,
             MessageBag::PrepareOk(message) => message.header().operation,
-        } as u8;
+        };
 
-        if operation < 200 {
-            self.dispatch_to_metadata_on_replica(replica, message).await;
-        } else {
-            self.dispatch_to_partition_on_replica(replica, message)
-                .await;
+        match operation {
+            Operation::SendMessages | Operation::StoreConsumerOffset => {
+                self.dispatch_to_partition_on_replica(replica, message)
+                    .await;
+            }
+            _ => {
+                self.dispatch_to_metadata_on_replica(replica, message).await;
+            }
         }
     }
 
@@ -158,3 +161,10 @@ impl Simulator {
         }
     }
 }
+
+// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
+// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
+// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering 
acks.
+// 2. Send a request to ns_b, step until ns_b reply arrives.
+// 3. Assert ns_b committed while ns_a pipeline is still full.
+// Requires namespace-aware stepping (filter bus by namespace) or two-phase 
delivery.
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ae4631e95..9f8081656 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -19,9 +19,9 @@ use crate::bus::{MemBus, SharedMemBus};
 use crate::deps::{
     MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
 };
-use consensus::{LocalPipeline, VsrConsensus};
+use consensus::{LocalPipeline, NamespacedPipeline, VsrConsensus};
 use iggy_common::IggyByteSize;
-use iggy_common::sharding::ShardId;
+use iggy_common::sharding::{IggyNamespace, ShardId};
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
@@ -29,9 +29,13 @@ use metadata::{IggyMetadata, variadic};
 use partitions::PartitionsConfig;
 use std::sync::Arc;
 
+// TODO: Make configurable
+const CLUSTER_ID: u128 = 1;
+
 pub struct Replica {
     pub id: u8,
     pub name: String,
+    pub replica_count: u8,
     pub metadata: SimMetadata,
     pub partitions: ReplicaPartitions,
     pub bus: Arc<MemBus>,
@@ -44,48 +48,43 @@ impl Replica {
         let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
         let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
 
-        let cluster_id: u128 = 1; // TODO: Make configurable
+        // Metadata uses namespace=0 (not partition-scoped)
         let metadata_consensus = VsrConsensus::new(
-            cluster_id,
+            CLUSTER_ID,
             id,
             replica_count,
+            0,
             SharedMemBus(Arc::clone(&bus)),
             LocalPipeline::new(),
         );
         metadata_consensus.init();
 
-        // Create separate consensus instance for partitions
-        let partitions_consensus = VsrConsensus::new(
-            cluster_id,
-            id,
-            replica_count,
-            SharedMemBus(Arc::clone(&bus)),
-            LocalPipeline::new(),
-        );
-        partitions_consensus.init();
-
-        // Configure partitions
         let partitions_config = PartitionsConfig {
             messages_required_to_save: 1000,
             size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 
1024),
             enforce_fsync: false, // Disable fsync for simulation
-            segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB 
segments
+            segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GiB 
segments
         };
 
-        // Only replica 0 gets consensus (primary shard for now)
-        let partitions = if id == 0 {
-            ReplicaPartitions::new(
-                ShardId::new(id as u16),
-                partitions_config,
-                Some(partitions_consensus),
-            )
-        } else {
-            ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, 
None)
-        };
+        let mut partitions = ReplicaPartitions::new(ShardId::new(id as u16), 
partitions_config);
+
+        // TODO: namespace=0 collides with metadata consensus. Safe for now 
because the simulator
+        // routes by Operation type, but a shared view change bus would 
produce namespace collisions.
+        let partition_consensus = VsrConsensus::new(
+            CLUSTER_ID,
+            id,
+            replica_count,
+            0,
+            SharedMemBus(Arc::clone(&bus)),
+            NamespacedPipeline::new(),
+        );
+        partition_consensus.init();
+        partitions.set_consensus(partition_consensus);
 
         Self {
             id,
             name,
+            replica_count,
             metadata: IggyMetadata {
                 consensus: Some(metadata_consensus),
                 journal: Some(SimJournal::<MemStorage>::default()),
@@ -96,4 +95,10 @@ impl Replica {
             bus,
         }
     }
+
+    pub fn init_partition(&mut self, namespace: IggyNamespace) {
+        self.partitions.init_partition_in_memory(namespace);
+        self.partitions
+            .register_namespace_in_pipeline(namespace.inner());
+    }
 }

Reply via email to