This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch nonblocking-consensus
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 7dec4e9e620eef6e87344f3ea157d4a5e1c7c20a
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 18 11:02:31 2026 +0100

    feat(consensus): implement per-partition consensus
    
    Shared pipeline across all partitions caps throughput
    at 8 in-flight ops. Each partition now gets its own
    VsrConsensus with independent pipeline and sequencing.
    
    IggyPartitions stores consensus_groups by namespace.
    Plane impl routes by namespace from message headers.
    View change headers (SVC/DVC/SV) gain namespace and
    align their frame layout with GenericHeader.
---
 core/common/src/types/consensus/header.rs |  24 ++--
 core/consensus/src/impls.rs               |  96 +++++++++----
 core/consensus/src/lib.rs                 |   5 +-
 core/consensus/src/plane_helpers.rs       | 133 ++++++++++++++++-
 core/metadata/src/impls/metadata.rs       |  88 ++++++------
 core/partitions/src/iggy_partitions.rs    | 230 +++++++++++++++++-------------
 core/simulator/src/lib.rs                 |  11 +-
 core/simulator/src/replica.rs             |  70 +++++----
 8 files changed, 443 insertions(+), 214 deletions(-)

diff --git a/core/common/src/types/consensus/header.rs 
b/core/common/src/types/consensus/header.rs
index c8d311fa7..066b9b681 100644
--- a/core/common/src/types/consensus/header.rs
+++ b/core/common/src/types/consensus/header.rs
@@ -135,8 +135,7 @@ pub struct GenericHeader {
     pub replica: u8,
     pub reserved_frame: [u8; 66],
 
-    pub namespace: u64,
-    pub reserved_command: [u8; 120],
+    pub reserved_command: [u8; 128],
 }
 
 unsafe impl Pod for GenericHeader {}
@@ -489,18 +488,17 @@ impl Default for ReplyHeader {
 #[repr(C)]
 pub struct StartViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
-    pub reserved: [u8; 128],
+    pub namespace: u64,
+    pub reserved: [u8; 120],
 }
 
 unsafe impl Pod for StartViewChangeHeader {}
@@ -536,16 +534,14 @@ impl ConsensusHeader for StartViewChangeHeader {
 #[repr(C)]
 pub struct DoViewChangeHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
     /// The highest op-number in this replica's log.
     /// Used to select the most complete log when log_view values are equal.
@@ -553,11 +549,12 @@ pub struct DoViewChangeHeader {
     /// The replica's commit number (highest committed op).
     /// The new primary sets its commit to max(commit) across all DVCs.
     pub commit: u64,
+    pub namespace: u64,
     /// The view number when this replica's status was last normal.
     /// This is the key field for log selection: the replica with the
     /// highest log_view has the most authoritative log.
     pub log_view: u32,
-    pub reserved: [u8; 108],
+    pub reserved: [u8; 100],
 }
 
 unsafe impl Pod for DoViewChangeHeader {}
@@ -609,16 +606,14 @@ impl ConsensusHeader for DoViewChangeHeader {
 #[repr(C)]
 pub struct StartViewHeader {
     pub checksum: u128,
-    pub checksum_padding: u128,
     pub checksum_body: u128,
-    pub checksum_body_padding: u128,
     pub cluster: u128,
     pub size: u32,
     pub view: u32,
     pub release: u32,
     pub command: Command2,
     pub replica: u8,
-    pub reserved_frame: [u8; 42],
+    pub reserved_frame: [u8; 66],
 
     /// The op-number of the highest entry in the new primary's log.
     /// Backups set their op to this value.
@@ -627,7 +622,8 @@ pub struct StartViewHeader {
     /// This is max(commit) from all DVCs received by the primary.
     /// Backups set their commit to this value.
     pub commit: u64,
-    pub reserved: [u8; 112],
+    pub namespace: u64,
+    pub reserved: [u8; 104],
 }
 
 unsafe impl Pod for StartViewHeader {}
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 4b1e620c8..4bb5e7d02 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -317,20 +317,6 @@ impl LocalPipeline {
     pub fn clear(&mut self) {
         self.prepare_queue.clear();
     }
-
-    /// Extract and remove a message by op number.
-    /// Returns None if op is not in the pipeline.
-    pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
-        let head_op = self.prepare_queue.front()?.header.op;
-        if op < head_op {
-            return None;
-        }
-        let index = (op - head_op) as usize;
-        if index >= self.prepare_queue.len() {
-            return None;
-        }
-        self.prepare_queue.remove(index)
-    }
 }
 
 impl Pipeline for LocalPipeline {
@@ -345,10 +331,6 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::pop_message(self)
     }
 
-    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> {
-        LocalPipeline::extract_by_op(self, op)
-    }
-
     fn clear(&mut self) {
         LocalPipeline::clear(self)
     }
@@ -365,6 +347,10 @@ impl Pipeline for LocalPipeline {
         LocalPipeline::message_by_op_and_checksum(self, op, checksum)
     }
 
+    fn head(&self) -> Option<&Self::Entry> {
+        LocalPipeline::head(self)
+    }
+
     fn is_full(&self) -> bool {
         LocalPipeline::is_full(self)
     }
@@ -389,7 +375,7 @@ pub enum Status {
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub enum VsrAction {
     /// Send StartViewChange to all replicas.
-    SendStartViewChange { view: u32 },
+    SendStartViewChange { view: u32, namespace: u64 },
     /// Send DoViewChange to primary.
     SendDoViewChange {
         view: u32,
@@ -397,11 +383,22 @@ pub enum VsrAction {
         log_view: u32,
         op: u64,
         commit: u64,
+        namespace: u64,
     },
     /// Send StartView to all backups (as new primary).
-    SendStartView { view: u32, op: u64, commit: u64 },
+    SendStartView {
+        view: u32,
+        op: u64,
+        commit: u64,
+        namespace: u64,
+    },
     /// Send PrepareOK to primary.
-    SendPrepareOk { view: u32, op: u64, target: u8 },
+    SendPrepareOk {
+        view: u32,
+        op: u64,
+        target: u8,
+        namespace: u64,
+    },
 }
 
 #[allow(unused)]
@@ -414,6 +411,7 @@ where
     cluster: u128,
     replica: u8,
     replica_count: u8,
+    namespace: u64,
 
     view: Cell<u32>,
 
@@ -455,16 +453,28 @@ where
 }
 
 impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
-    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, 
pipeline: P) -> Self {
+    pub fn new(
+        cluster: u128,
+        replica: u8,
+        replica_count: u8,
+        namespace: u64,
+        message_bus: B,
+        pipeline: P,
+    ) -> Self {
         assert!(
             replica < replica_count,
             "replica index must be < replica_count"
         );
         assert!(replica_count >= 1, "need at least 1 replica");
+        // TODO: Verify that XOR-based seeding provides sufficient jitter 
diversity
+        // across groups. Consider using a proper hash (e.g., Murmur3) of
+        // (replica_id, namespace) for production.
+        let timeout_seed = replica as u128 ^ namespace as u128;
         Self {
             cluster,
             replica,
             replica_count,
+            namespace,
             view: Cell::new(0),
             log_view: Cell::new(0),
             status: Cell::new(Status::Recovering),
@@ -479,7 +489,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             do_view_change_quorum: Cell::new(false),
             sent_own_start_view_change: Cell::new(false),
             sent_own_do_view_change: Cell::new(false),
-            timeouts: RefCell::new(TimeoutManager::new(replica as u128)),
+            timeouts: RefCell::new(TimeoutManager::new(timeout_seed)),
         }
     }
 
@@ -563,6 +573,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.replica_count
     }
 
+    pub fn namespace(&self) -> u64 {
+        self.namespace
+    }
+
     pub fn log_view(&self) -> u32 {
         self.log_view.get()
     }
@@ -678,7 +692,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             timeouts.start(TimeoutKind::ViewChangeStatus);
         }
 
-        vec![VsrAction::SendStartViewChange { view: new_view }]
+        vec![VsrAction::SendStartViewChange {
+            view: new_view,
+            namespace: self.namespace,
+        }]
     }
 
     /// Resend SVC message if we've started view change.
@@ -693,6 +710,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
 
         vec![VsrAction::SendStartViewChange {
             view: self.view.get(),
+            namespace: self.namespace,
         }]
     }
 
@@ -725,6 +743,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             log_view: self.log_view.get(),
             op: current_op,
             commit: current_commit,
+            namespace: self.namespace,
         }]
     }
 
@@ -748,7 +767,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             .borrow_mut()
             .reset(TimeoutKind::ViewChangeStatus);
 
-        vec![VsrAction::SendStartViewChange { view: next_view }]
+        vec![VsrAction::SendStartViewChange {
+            view: next_view,
+            namespace: self.namespace,
+        }]
     }
 
     /// Handle a received StartViewChange message.
@@ -757,6 +779,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node
     /// that will be the primary in the new view."
     pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> 
Vec<VsrAction> {
+        debug_assert_eq!(
+            header.namespace, self.namespace,
+            "SVC routed to wrong group"
+        );
         let from_replica = header.replica;
         let msg_view = header.view;
 
@@ -786,7 +812,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             }
 
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+            actions.push(VsrAction::SendStartViewChange {
+                view: msg_view,
+                namespace: self.namespace,
+            });
         }
 
         // Record the SVC from sender
@@ -816,6 +845,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
                 log_view: self.log_view.get(),
                 op: current_op,
                 commit: current_commit,
+                namespace: self.namespace,
             });
 
             // If we are the primary candidate, record our own DVC
@@ -848,6 +878,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// replicas (including itself), it sets its view-number to that in the 
messages
     /// and selects as the new log the one contained in the message with the 
largest v'..."
     pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> 
Vec<VsrAction> {
+        debug_assert_eq!(
+            header.namespace, self.namespace,
+            "DVC routed to wrong group"
+        );
         let from_replica = header.replica;
         let msg_view = header.view;
         let msg_log_view = header.log_view;
@@ -880,7 +914,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             }
 
             // Send our own SVC
-            actions.push(VsrAction::SendStartViewChange { view: msg_view });
+            actions.push(VsrAction::SendStartViewChange {
+                view: msg_view,
+                namespace: self.namespace,
+            });
         }
 
         // Only the primary candidate processes DVCs for quorum
@@ -939,6 +976,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
     /// in the log, set their view-number to the view number in the message, 
change
     /// their status to normal, and send PrepareOK for any uncommitted ops."
     pub fn handle_start_view(&self, header: &StartViewHeader) -> 
Vec<VsrAction> {
+        debug_assert_eq!(header.namespace, self.namespace, "SV routed to wrong 
group");
         let from_replica = header.replica;
         let msg_view = header.view;
         let msg_op = header.op;
@@ -984,7 +1022,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             actions.push(VsrAction::SendPrepareOk {
                 view: msg_view,
                 op: op_num,
-                target: from_replica, // Send to new primary
+                target: from_replica,
+                namespace: self.namespace,
             });
         }
 
@@ -1020,6 +1059,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             view: self.view.get(),
             op: new_op,
             commit: max_commit,
+            namespace: self.namespace,
         }]
     }
 
diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs
index 17f2fb9ea..9ddf4994d 100644
--- a/core/consensus/src/lib.rs
+++ b/core/consensus/src/lib.rs
@@ -29,9 +29,6 @@ pub trait Pipeline {
 
     fn pop_message(&mut self) -> Option<Self::Entry>;
 
-    /// Extract and remove a message by op number.
-    fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>;
-
     fn clear(&mut self);
 
     fn message_by_op(&self, op: u64) -> Option<&Self::Entry>;
@@ -40,6 +37,8 @@ pub trait Pipeline {
 
     fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&Self::Entry>;
 
+    fn head(&self) -> Option<&Self::Entry>;
+
     fn is_full(&self) -> bool;
 
     fn is_empty(&self) -> bool;
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index 60ea1dedf..6906eb9f5 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -135,7 +135,7 @@ where
     Ok(())
 }
 
-/// Shared quorum + extraction flow for ack handling.
+/// Shared quorum tracking flow for ack handling.
 pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: 
&PrepareOkHeader) -> bool
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
@@ -149,6 +149,33 @@ where
     true
 }
 
+/// Drain and return committable prepares from the pipeline head.
+///
+/// Entries are drained only from the head and only while their op is covered
+/// by the current commit frontier.
+pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> 
Vec<PipelineEntry>
+where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    let commit = consensus.commit();
+    let mut drained = Vec::new();
+    let mut pipeline = consensus.pipeline().borrow_mut();
+
+    while let Some(head_op) = pipeline.head().map(|entry| entry.header.op) {
+        if head_op > commit {
+            break;
+        }
+
+        let entry = pipeline
+            .pop_message()
+            .expect("drain_committable_prefix: head exists");
+        drained.push(entry);
+    }
+
+    drained
+}
+
 /// Shared reply-message construction for committed prepare.
 pub fn build_reply_message<B, P>(
     consensus: &VsrConsensus<B, P>,
@@ -269,3 +296,107 @@ pub async fn send_prepare_ok<B, P>(
             .unwrap();
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{Consensus, LocalPipeline};
+    use iggy_common::IggyError;
+
+    #[derive(Debug, Default)]
+    struct NoopBus;
+
+    impl MessageBus for NoopBus {
+        type Client = u128;
+        type Replica = u8;
+        type Data = Message<GenericHeader>;
+        type Sender = ();
+
+        fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender) 
-> bool {
+            true
+        }
+
+        fn remove_client(&mut self, _client: Self::Client) -> bool {
+            true
+        }
+
+        fn add_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+
+        fn remove_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+
+        async fn send_to_client(
+            &self,
+            _client_id: Self::Client,
+            _data: Self::Data,
+        ) -> Result<(), IggyError> {
+            Ok(())
+        }
+
+        async fn send_to_replica(
+            &self,
+            _replica: Self::Replica,
+            _data: Self::Data,
+        ) -> Result<(), IggyError> {
+            Ok(())
+        }
+    }
+
+    fn prepare_message(op: u64, parent: u128, checksum: u128) -> 
Message<PrepareHeader> {
+        
Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header(
+            |_, new| {
+                *new = PrepareHeader {
+                    command: Command2::Prepare,
+                    op,
+                    parent,
+                    checksum,
+                    ..Default::default()
+                };
+            },
+        )
+    }
+
+    #[test]
+    fn drains_head_prefix_by_commit_frontier() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        consensus.pipeline_message(prepare_message(1, 0, 10));
+        consensus.pipeline_message(prepare_message(2, 10, 20));
+        consensus.pipeline_message(prepare_message(3, 20, 30));
+
+        consensus.advance_commit_number(3);
+
+        let drained = drain_committable_prefix(&consensus);
+        let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
+        assert_eq!(drained_ops, vec![1, 2, 3]);
+        assert!(consensus.pipeline().borrow().is_empty());
+    }
+
+    #[test]
+    fn drains_only_up_to_commit_frontier_even_without_quorum_flags() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        consensus.pipeline_message(prepare_message(5, 0, 50));
+        consensus.pipeline_message(prepare_message(6, 50, 60));
+        consensus.pipeline_message(prepare_message(7, 60, 70));
+
+        consensus.advance_commit_number(6);
+        let drained = drain_committable_prefix(&consensus);
+        let drained_ops: Vec<_> = drained.into_iter().map(|entry| 
entry.header.op).collect();
+
+        assert_eq!(drained_ops, vec![5, 6]);
+        assert_eq!(
+            consensus
+                .pipeline()
+                .borrow()
+                .head()
+                .map(|entry| entry.header.op),
+            Some(7)
+        );
+    }
+}
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 9a81461af..c9e5fed54 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -18,7 +18,7 @@ use crate::stm::StateMachine;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
 use consensus::{
     Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, 
VsrConsensus, ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit,
+    ack_quorum_reached, build_reply_message, drain_committable_prefix, 
fence_old_prepare_by_commit,
     panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, 
replicate_preflight,
     replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common,
 };
@@ -195,47 +195,51 @@ where
 
             debug!("on_ack: quorum received for op={}", header.op);
 
-            // Extract the header from the pipeline, fetch the full message 
from journal
-            // TODO: Commit from the head. ALWAYS
-            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(entry) = entry else {
-                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
-                return;
-            };
-
-            let prepare_header = entry.header;
-            // TODO(hubcio): should we replace this with graceful fallback 
(warn + return)?
-            // When journal compaction is implemented compaction could race
-            // with this lookup if it removes entries below the commit number.
-            let prepare = journal
-                .handle()
-                .entry(&prepare_header)
-                .await
-                .unwrap_or_else(|| {
-                    panic!(
-                        "on_ack: committed prepare op={} checksum={} must be 
in journal",
-                        prepare_header.op, prepare_header.checksum
-                    )
-                });
-
-            // Apply the state (consumes prepare)
-            // TODO: Handle appending result to response
-            let _result = self.mux_stm.update(prepare);
-            debug!("on_ack: state applied for op={}", prepare_header.op);
-
-            // Send reply to client
-            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
-            debug!(
-                "on_ack: sending reply to client={} for op={}",
-                prepare_header.client, prepare_header.op
-            );
-
-            // TODO: Error handling
-            consensus
-                .message_bus()
-                .send_to_client(prepare_header.client, generic_reply)
-                .await
-                .unwrap()
+            let drained = drain_committable_prefix(consensus);
+            if let (Some(first), Some(last)) = (drained.first(), 
drained.last()) {
+                debug!(
+                    "on_ack: draining committed prefix ops=[{}..={}] count={}",
+                    first.header.op,
+                    last.header.op,
+                    drained.len()
+                );
+            }
+
+            for entry in drained {
+                let prepare_header = entry.header;
+                // TODO(hubcio): should we replace this with graceful fallback 
(warn + return)?
+                // When journal compaction is implemented compaction could race
+                // with this lookup if it removes entries below the commit 
number.
+                let prepare = journal
+                    .handle()
+                    .entry(&prepare_header)
+                    .await
+                    .unwrap_or_else(|| {
+                        panic!(
+                            "on_ack: committed prepare op={} checksum={} must 
be in journal",
+                            prepare_header.op, prepare_header.checksum
+                        )
+                    });
+
+                // Apply the state (consumes prepare)
+                // TODO: Handle appending result to response
+                let _result = self.mux_stm.update(prepare);
+                debug!("on_ack: state applied for op={}", prepare_header.op);
+
+                // Send reply to client
+                let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
+                debug!(
+                    "on_ack: sending reply to client={} for op={}",
+                    prepare_header.client, prepare_header.op
+                );
+
+                // TODO: Error handling
+                consensus
+                    .message_bus()
+                    .send_to_client(prepare_header.client, generic_reply)
+                    .await
+                    .unwrap()
+            }
         }
     }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 54891dfdd..03321a126 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -22,8 +22,9 @@ use crate::Partition;
 use crate::types::PartitionsConfig;
 use consensus::{
     Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, 
ack_preflight,
-    ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, 
pipeline_prepare_common,
-    replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as 
send_prepare_ok_common,
+    ack_quorum_reached, build_reply_message, drain_committable_prefix, 
fence_old_prepare_by_commit,
+    pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
+    send_prepare_ok as send_prepare_ok_common,
 };
 use iggy_common::{
     INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, 
PartitionStats, PooledBuffer,
@@ -58,33 +59,29 @@ pub struct IggyPartitions<C> {
     /// mutate partition state (segments, offsets, journal).
     partitions: UnsafeCell<Vec<IggyPartition>>,
     namespace_to_local: HashMap<IggyNamespace, LocalIdx>,
-    /// Some on shard0, None on other shards
-    pub consensus: Option<C>,
+    // TODO: Consider heartbeat/commit coalescing -- one batch message per 
node pair
+    // per tick instead of per-group. See CockroachDB heartbeat coalescing.
+    consensus_groups: HashMap<IggyNamespace, C>,
 }
 
 impl<C> IggyPartitions<C> {
-    pub fn new(shard_id: ShardId, config: PartitionsConfig, consensus: 
Option<C>) -> Self {
+    pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
         Self {
             shard_id,
             config,
             partitions: UnsafeCell::new(Vec::new()),
             namespace_to_local: HashMap::new(),
-            consensus,
+            consensus_groups: HashMap::new(),
         }
     }
 
-    pub fn with_capacity(
-        shard_id: ShardId,
-        config: PartitionsConfig,
-        consensus: Option<C>,
-        capacity: usize,
-    ) -> Self {
+    pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig, 
capacity: usize) -> Self {
         Self {
             shard_id,
             config,
             partitions: UnsafeCell::new(Vec::with_capacity(capacity)),
             namespace_to_local: HashMap::with_capacity(capacity),
-            consensus,
+            consensus_groups: HashMap::with_capacity(capacity),
         }
     }
 
@@ -160,6 +157,7 @@ impl<C> IggyPartitions<C> {
     /// Remove a partition by namespace. Returns the removed partition if 
found.
     pub fn remove(&mut self, namespace: &IggyNamespace) -> 
Option<IggyPartition> {
         let local_idx = self.namespace_to_local.remove(namespace)?;
+        self.consensus_groups.remove(namespace);
         let idx = *local_idx;
         let partitions = self.partitions.get_mut();
 
@@ -231,17 +229,29 @@ impl<C> IggyPartitions<C> {
         &mut self.partitions_mut()[idx]
     }
 
+    pub fn consensus(&self, ns: &IggyNamespace) -> Option<&C> {
+        self.consensus_groups.get(ns)
+    }
+
     /// Initialize a new partition with in-memory storage (for 
testing/simulation).
     ///
-    /// This is a simplified version that doesn't create file-backed storage.
-    /// Use `init_partition()` for production use with real files.
+    /// On first call, registers both partition and consensus group atomically
+    /// so Plane dispatch cannot observe a partition without its consensus 
state.
+    /// Subsequent calls for the same namespace are no-ops returning the 
existing index.
     ///
     /// TODO: Make the log generic over its storage backend to support both
     /// in-memory (for testing) and file-backed (for production) storage 
without
     /// needing separate initialization methods.
-    pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
-        // Check if already initialized
+    pub fn init_partition_in_memory(
+        &mut self,
+        namespace: IggyNamespace,
+        consensus: Option<C>,
+    ) -> LocalIdx {
         if let Some(idx) = self.local_idx(&namespace) {
+            debug_assert!(
+                self.consensus_groups.contains_key(&namespace) || 
consensus.is_none(),
+                "re-init with consensus for namespace that has no consensus 
group"
+            );
             return idx;
         }
 
@@ -262,7 +272,14 @@ impl<C> IggyPartitions<C> {
         partition.stats.increment_segments_count(1);
 
         // Insert and return local index
-        self.insert(namespace, partition)
+        let idx = self.insert(namespace, partition);
+        // TODO: Shared WAL -- per-partition fdatasync is untenable at scale. 
Batch
+        // journal writes across all groups into a single WAL with one 
fdatasync per
+        // tick. See TiKV raft-engine crate.
+        if let Some(c) = consensus {
+            self.consensus_groups.insert(namespace, c);
+        }
+        idx
     }
 
     /// Initialize a new partition with file-backed storage.
@@ -275,10 +292,18 @@ impl<C> IggyPartitions<C> {
     /// 2. Control plane: broadcast to shards (SKIPPED in this method)
     /// 3. Data plane: INITIATE PARTITION (THIS METHOD)
     ///
-    /// Idempotent - returns existing LocalIdx if partition already exists.
-    pub async fn init_partition(&mut self, namespace: IggyNamespace) -> 
LocalIdx {
-        // Check if already initialized
+    /// Idempotent: first call registers partition + consensus atomically,
+    /// subsequent calls for the same namespace are no-ops.
+    pub async fn init_partition(
+        &mut self,
+        namespace: IggyNamespace,
+        consensus: Option<C>,
+    ) -> LocalIdx {
         if let Some(idx) = self.local_idx(&namespace) {
+            debug_assert!(
+                self.consensus_groups.contains_key(&namespace) || 
consensus.is_none(),
+                "re-init with consensus for namespace that has no consensus 
group"
+            );
             return idx;
         }
 
@@ -325,7 +350,11 @@ impl<C> IggyPartitions<C> {
         partition.stats.increment_segments_count(1);
 
         // Insert and return local index
-        self.insert(namespace, partition)
+        let idx = self.insert(namespace, partition);
+        if let Some(c) = consensus {
+            self.consensus_groups.insert(namespace, c);
+        }
+        idx
     }
 }
 
@@ -334,17 +363,22 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
 {
     async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
-        let consensus = self.consensus.as_ref().unwrap();
+        let namespace = IggyNamespace::from_raw(message.header().namespace);
+        let consensus = self
+            .consensus(&namespace)
+            .expect("on_request: no consensus group for namespace");
 
-        debug!("handling partition request");
+        debug!(?namespace, "handling partition request");
         let prepare = message.project(consensus);
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }
 
     async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
-        let consensus = self.consensus.as_ref().unwrap();
-
         let header = message.header();
+        let namespace = IggyNamespace::from_raw(header.namespace);
+        let consensus = self
+            .consensus(&namespace)
+            .expect("on_replicate: no consensus group for namespace");
 
         let current_op = match replicate_preflight(consensus, header) {
             Ok(current_op) => current_op,
@@ -359,7 +393,7 @@ where
 
         let is_old_prepare = fence_old_prepare_by_commit(consensus, header);
         if !is_old_prepare {
-            self.replicate(message.clone()).await;
+            self.replicate(&namespace, message.clone()).await;
         } else {
             warn!("received old prepare, not replicating");
         }
@@ -371,21 +405,21 @@ where
         // TODO: Figure out the flow of the partition operations.
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
-        let namespace = IggyNamespace::from_raw(header.namespace);
-        self.apply_replicated_operation(&message, &namespace).await;
+        self.apply_replicated_operation(&namespace, &message).await;
 
-        // After successful journal write, send prepare_ok to primary.
-        self.send_prepare_ok(header).await;
+        self.send_prepare_ok(&namespace, header).await;
 
-        // If follower, commit any newly committable entries.
         if consensus.is_follower() {
-            self.commit_journal();
+            self.commit_journal(&namespace);
         }
     }
 
     async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
-        let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
+        let namespace = IggyNamespace::from_raw(header.namespace);
+        let consensus = self
+            .consensus(&namespace)
+            .expect("on_ack: no consensus group for namespace");
 
         if let Err(reason) = ack_preflight(consensus) {
             warn!("on_ack: ignoring ({reason})");
@@ -394,72 +428,68 @@ where
 
         {
             let pipeline = consensus.pipeline().borrow();
-            let Some(entry) =
-                pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
-            else {
+            if pipeline
+                .message_by_op_and_checksum(header.op, header.prepare_checksum)
+                .is_none()
+            {
                 debug!("on_ack: prepare not in pipeline op={}", header.op);
                 return;
-            };
-
-            if entry.header.checksum != header.prepare_checksum {
-                warn!("on_ack: checksum mismatch");
-                return;
             }
         }
 
         if ack_quorum_reached(consensus, header) {
             debug!("on_ack: quorum received for op={}", header.op);
 
-            // Extract the prepare message from the pipeline by op
-            // TODO: Commit from the head. ALWAYS
-            let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
-            let Some(PipelineEntry {
+            let drained = drain_committable_prefix(consensus);
+            if let (Some(first), Some(last)) = (drained.first(), 
drained.last()) {
+                debug!(
+                    "on_ack: draining committed prefix ops=[{}..={}] count={}",
+                    first.header.op,
+                    last.header.op,
+                    drained.len()
+                );
+            }
+
+            for PipelineEntry {
                 header: prepare_header,
                 ..
-            }) = entry
-            else {
-                warn!("on_ack: prepare not found in pipeline for op={}", 
header.op);
-                return;
-            };
-
-            // Data was already appended to the partition journal during
-            // on_replicate. Now that quorum is reached, update the partition's
-            // current offset and check whether the journal needs flushing.
-            let namespace = IggyNamespace::from_raw(prepare_header.namespace);
-
-            match prepare_header.operation {
-                Operation::SendMessages => {
-                    self.commit_messages(&namespace).await;
-                    debug!("on_ack: messages committed for op={}", 
prepare_header.op,);
-                }
-                Operation::StoreConsumerOffset => {
-                    // TODO: Commit consumer offset update.
-                    debug!(
-                        "on_ack: consumer offset committed for op={}",
-                        prepare_header.op
-                    );
+            } in drained
+            {
+                let entry_namespace = 
IggyNamespace::from_raw(prepare_header.namespace);
+
+                match prepare_header.operation {
+                    Operation::SendMessages => {
+                        self.commit_messages(&entry_namespace).await;
+                        debug!("on_ack: messages committed for op={}", 
prepare_header.op,);
+                    }
+                    Operation::StoreConsumerOffset => {
+                        // TODO: Commit consumer offset update.
+                        debug!(
+                            "on_ack: consumer offset committed for op={}",
+                            prepare_header.op
+                        );
+                    }
+                    _ => {
+                        warn!(
+                            "on_ack: unexpected operation {:?} for op={}",
+                            prepare_header.operation, prepare_header.op
+                        );
+                    }
                 }
-                _ => {
-                    warn!(
-                        "on_ack: unexpected operation {:?} for op={}",
-                        prepare_header.operation, prepare_header.op
-                    );
-                }
-            }
 
-            // Send reply to client
-            let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
-            debug!(
-                "on_ack: sending reply to client={} for op={}",
-                prepare_header.client, prepare_header.op
-            );
+                let generic_reply = build_reply_message(consensus, 
&prepare_header).into_generic();
+                debug!(
+                    "on_ack: sending reply to client={} for op={}",
+                    prepare_header.client, prepare_header.op
+                );
 
-            // TODO: Error handling
-            consensus
-                .message_bus()
-                .send_to_client(prepare_header.client, generic_reply)
-                .await
-                .unwrap()
+                // TODO: Error handling
+                consensus
+                    .message_bus()
+                    .send_to_client(prepare_header.client, generic_reply)
+                    .await
+                    .unwrap()
+            }
         }
     }
 }
@@ -488,10 +518,12 @@ where
 
     async fn apply_replicated_operation(
         &self,
-        message: &Message<PrepareHeader>,
         namespace: &IggyNamespace,
+        message: &Message<PrepareHeader>,
     ) {
-        let consensus = self.consensus.as_ref().unwrap();
+        let consensus = self
+            .consensus(namespace)
+            .expect("apply_replicated_operation: no consensus group");
         let header = message.header();
 
         match header.operation {
@@ -552,16 +584,16 @@ where
 
     /// Replicate a prepare message to the next replica in the chain.
     ///
-    /// Chain replication pattern:
-    /// - Primary sends to first backup
-    /// - Each backup forwards to the next
-    /// - Stops when we would forward back to primary
-    async fn replicate(&self, message: Message<PrepareHeader>) {
-        let consensus = self.consensus.as_ref().unwrap();
+    /// Chain replication: primary -> first backup -> ... -> last backup.
+    /// Stops when the next replica would be the primary.
+    async fn replicate(&self, namespace: &IggyNamespace, message: 
Message<PrepareHeader>) {
+        let consensus = self
+            .consensus(namespace)
+            .expect("replicate: no consensus group");
         replicate_to_next_in_chain(consensus, message).await;
     }
 
-    fn commit_journal(&self) {
+    fn commit_journal(&self, _namespace: &IggyNamespace) {
         // TODO: Implement commit logic for followers.
         // Walk through journal from last committed to current commit number
         // Apply each entry to the partition state
@@ -786,8 +818,10 @@ where
         debug!(?namespace, start_offset, "rotated to new segment");
     }
 
-    async fn send_prepare_ok(&self, header: &PrepareHeader) {
-        let consensus = self.consensus.as_ref().unwrap();
+    async fn send_prepare_ok(&self, namespace: &IggyNamespace, header: 
&PrepareHeader) {
+        let consensus = self
+            .consensus(namespace)
+            .expect("send_prepare_ok: no consensus group");
         // TODO: Verify the prepare is persisted in the partition journal.
         // The partition journal uses MessageLookup headers, so we cannot
         // check by PrepareHeader.op directly. For now, skip this check.
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index 44e063f78..4228abc13 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -34,10 +34,10 @@ pub struct Simulator {
 }
 
 impl Simulator {
-    /// Initialize a partition on all replicas (in-memory for simulation)
+    /// Initialize a partition with its own consensus group on all replicas.
     pub fn init_partition(&mut self, namespace: 
iggy_common::sharding::IggyNamespace) {
         for replica in &mut self.replicas {
-            replica.partitions.init_partition_in_memory(namespace);
+            replica.init_partition(namespace);
         }
     }
 
@@ -158,3 +158,10 @@ impl Simulator {
         }
     }
 }
+
+// TODO(IGGY-66): Add acceptance test for per-partition consensus independence.
+// Setup: 3-replica simulator, two partitions (ns_a, ns_b).
+// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering 
acks.
+// 2. Send a request to ns_b, step until ns_b reply arrives.
+// 3. Assert ns_b committed while ns_a pipeline is still full.
+// Requires namespace-aware stepping (filter bus by namespace) or two-phase 
delivery.
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index ae4631e95..52800f961 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -21,7 +21,7 @@ use crate::deps::{
 };
 use consensus::{LocalPipeline, VsrConsensus};
 use iggy_common::IggyByteSize;
-use iggy_common::sharding::ShardId;
+use iggy_common::sharding::{IggyNamespace, ShardId};
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
@@ -29,9 +29,13 @@ use metadata::{IggyMetadata, variadic};
 use partitions::PartitionsConfig;
 use std::sync::Arc;
 
+// TODO: Make configurable
+const CLUSTER_ID: u128 = 1;
+
 pub struct Replica {
     pub id: u8,
     pub name: String,
+    pub replica_count: u8,
     pub metadata: SimMetadata,
     pub partitions: ReplicaPartitions,
     pub bus: Arc<MemBus>,
@@ -44,48 +48,30 @@ impl Replica {
         let consumer_groups: ConsumerGroups = 
ConsumerGroupsInner::new().into();
         let mux = SimMuxStateMachine::new(variadic!(users, streams, 
consumer_groups));
 
-        let cluster_id: u128 = 1; // TODO: Make configurable
+        // Metadata uses namespace=0 (not partition-scoped)
         let metadata_consensus = VsrConsensus::new(
-            cluster_id,
+            CLUSTER_ID,
             id,
             replica_count,
+            0,
             SharedMemBus(Arc::clone(&bus)),
             LocalPipeline::new(),
         );
         metadata_consensus.init();
 
-        // Create separate consensus instance for partitions
-        let partitions_consensus = VsrConsensus::new(
-            cluster_id,
-            id,
-            replica_count,
-            SharedMemBus(Arc::clone(&bus)),
-            LocalPipeline::new(),
-        );
-        partitions_consensus.init();
-
-        // Configure partitions
         let partitions_config = PartitionsConfig {
             messages_required_to_save: 1000,
             size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 
1024),
-            enforce_fsync: false, // Disable fsync for simulation
-            segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB 
segments
+            enforce_fsync: false,
+            segment_size: IggyByteSize::from(1024 * 1024 * 1024),
         };
 
-        // Only replica 0 gets consensus (primary shard for now)
-        let partitions = if id == 0 {
-            ReplicaPartitions::new(
-                ShardId::new(id as u16),
-                partitions_config,
-                Some(partitions_consensus),
-            )
-        } else {
-            ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, 
None)
-        };
+        let partitions = ReplicaPartitions::new(ShardId::new(id as u16), 
partitions_config);
 
         Self {
             id,
             name,
+            replica_count,
             metadata: IggyMetadata {
                 consensus: Some(metadata_consensus),
                 journal: Some(SimJournal::<MemStorage>::default()),
@@ -96,4 +82,36 @@ impl Replica {
             bus,
         }
     }
+
+    pub fn init_partition(&mut self, namespace: IggyNamespace) {
+        if self.partitions.local_idx(&namespace).is_some() {
+            return;
+        }
+        let consensus = Self::create_partition_consensus(
+            self.id,
+            self.replica_count,
+            &self.bus,
+            namespace.inner(),
+        );
+        self.partitions
+            .init_partition_in_memory(namespace, Some(consensus));
+    }
+
+    fn create_partition_consensus(
+        id: u8,
+        replica_count: u8,
+        bus: &Arc<MemBus>,
+        namespace: u64,
+    ) -> VsrConsensus<SharedMemBus> {
+        let consensus = VsrConsensus::new(
+            CLUSTER_ID,
+            id,
+            replica_count,
+            namespace,
+            SharedMemBus(Arc::clone(bus)),
+            LocalPipeline::new(),
+        );
+        consensus.init();
+        consensus
+    }
 }


Reply via email to