This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch nonblocking-consensus in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7dec4e9e620eef6e87344f3ea157d4a5e1c7c20a Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Feb 18 11:02:31 2026 +0100 feat(consensus): implement per-partition consensus Shared pipeline across all partitions caps throughput at 8 in-flight ops. Each partition now gets its own VsrConsensus with independent pipeline and sequencing. IggyPartitions stores consensus_groups by namespace. Plane impl routes by namespace from message headers. View change headers (SVC/DVC/SV) gain namespace and align their frame layout with GenericHeader. --- core/common/src/types/consensus/header.rs | 24 ++-- core/consensus/src/impls.rs | 96 +++++++++---- core/consensus/src/lib.rs | 5 +- core/consensus/src/plane_helpers.rs | 133 ++++++++++++++++- core/metadata/src/impls/metadata.rs | 88 ++++++------ core/partitions/src/iggy_partitions.rs | 230 +++++++++++++++++------------- core/simulator/src/lib.rs | 11 +- core/simulator/src/replica.rs | 70 +++++---- 8 files changed, 443 insertions(+), 214 deletions(-) diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index c8d311fa7..066b9b681 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -135,8 +135,7 @@ pub struct GenericHeader { pub replica: u8, pub reserved_frame: [u8; 66], - pub namespace: u64, - pub reserved_command: [u8; 120], + pub reserved_command: [u8; 128], } unsafe impl Pod for GenericHeader {} @@ -489,18 +488,17 @@ impl Default for ReplyHeader { #[repr(C)] pub struct StartViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], - pub reserved: [u8; 128], + pub namespace: u64, + pub reserved: [u8; 120], } unsafe impl Pod for StartViewChangeHeader {} @@ -536,16 +534,14 @@ impl ConsensusHeader for StartViewChangeHeader { #[repr(C)] pub struct DoViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], /// The highest op-number in this replica's log. /// Used to select the most complete log when log_view values are equal. @@ -553,11 +549,12 @@ pub struct DoViewChangeHeader { /// The replica's commit number (highest committed op). /// The new primary sets its commit to max(commit) across all DVCs. pub commit: u64, + pub namespace: u64, /// The view number when this replica's status was last normal. /// This is the key field for log selection: the replica with the /// highest log_view has the most authoritative log. pub log_view: u32, - pub reserved: [u8; 108], + pub reserved: [u8; 100], } unsafe impl Pod for DoViewChangeHeader {} @@ -609,16 +606,14 @@ impl ConsensusHeader for DoViewChangeHeader { #[repr(C)] pub struct StartViewHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], /// The op-number of the highest entry in the new primary's log. /// Backups set their op to this value. @@ -627,7 +622,8 @@ pub struct StartViewHeader { /// This is max(commit) from all DVCs received by the primary. /// Backups set their commit to this value. pub commit: u64, - pub reserved: [u8; 112], + pub namespace: u64, + pub reserved: [u8; 104], } unsafe impl Pod for StartViewHeader {} diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 4b1e620c8..4bb5e7d02 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -317,20 +317,6 @@ impl LocalPipeline { pub fn clear(&mut self) { self.prepare_queue.clear(); } - - /// Extract and remove a message by op number. - /// Returns None if op is not in the pipeline. - pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> { - let head_op = self.prepare_queue.front()?.header.op; - if op < head_op { - return None; - } - let index = (op - head_op) as usize; - if index >= self.prepare_queue.len() { - return None; - } - self.prepare_queue.remove(index) - } } impl Pipeline for LocalPipeline { @@ -345,10 +331,6 @@ impl Pipeline for LocalPipeline { LocalPipeline::pop_message(self) } - fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry> { - LocalPipeline::extract_by_op(self, op) - } - fn clear(&mut self) { LocalPipeline::clear(self) } @@ -365,6 +347,10 @@ impl Pipeline for LocalPipeline { LocalPipeline::message_by_op_and_checksum(self, op, checksum) } + fn head(&self) -> Option<&Self::Entry> { + LocalPipeline::head(self) + } + fn is_full(&self) -> bool { LocalPipeline::is_full(self) } @@ -389,7 +375,7 @@ pub enum Status { #[derive(Debug, Clone, PartialEq, Eq)] pub enum VsrAction { /// Send StartViewChange to all replicas. - SendStartViewChange { view: u32 }, + SendStartViewChange { view: u32, namespace: u64 }, /// Send DoViewChange to primary. SendDoViewChange { view: u32, @@ -397,11 +383,22 @@ pub enum VsrAction { log_view: u32, op: u64, commit: u64, + namespace: u64, }, /// Send StartView to all backups (as new primary). - SendStartView { view: u32, op: u64, commit: u64 }, + SendStartView { + view: u32, + op: u64, + commit: u64, + namespace: u64, + }, /// Send PrepareOK to primary. - SendPrepareOk { view: u32, op: u64, target: u8 }, + SendPrepareOk { + view: u32, + op: u64, + target: u8, + namespace: u64, + }, } #[allow(unused)] @@ -414,6 +411,7 @@ where cluster: u128, replica: u8, replica_count: u8, + namespace: u64, view: Cell<u32>, @@ -455,16 +453,28 @@ where } impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { - pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, pipeline: P) -> Self { + pub fn new( + cluster: u128, + replica: u8, + replica_count: u8, + namespace: u64, + message_bus: B, + pipeline: P, + ) -> Self { assert!( replica < replica_count, "replica index must be < replica_count" ); assert!(replica_count >= 1, "need at least 1 replica"); + // TODO: Verify that XOR-based seeding provides sufficient jitter diversity + // across groups. Consider using a proper hash (e.g., Murmur3) of + // (replica_id, namespace) for production. + let timeout_seed = replica as u128 ^ namespace as u128; Self { cluster, replica, replica_count, + namespace, view: Cell::new(0), log_view: Cell::new(0), status: Cell::new(Status::Recovering), @@ -479,7 +489,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { do_view_change_quorum: Cell::new(false), sent_own_start_view_change: Cell::new(false), sent_own_do_view_change: Cell::new(false), - timeouts: RefCell::new(TimeoutManager::new(replica as u128)), + timeouts: RefCell::new(TimeoutManager::new(timeout_seed)), } } @@ -563,6 +573,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.replica_count } + pub fn namespace(&self) -> u64 { + self.namespace + } + pub fn log_view(&self) -> u32 { self.log_view.get() } @@ -678,7 +692,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::ViewChangeStatus); } - vec![VsrAction::SendStartViewChange { view: new_view }] + vec![VsrAction::SendStartViewChange { + view: new_view, + namespace: self.namespace, + }] } /// Resend SVC message if we've started view change. @@ -693,6 +710,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { vec![VsrAction::SendStartViewChange { view: self.view.get(), + namespace: self.namespace, }] } @@ -725,6 +743,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { log_view: self.log_view.get(), op: current_op, commit: current_commit, + namespace: self.namespace, }] } @@ -748,7 +767,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .reset(TimeoutKind::ViewChangeStatus); - vec![VsrAction::SendStartViewChange { view: next_view }] + vec![VsrAction::SendStartViewChange { + view: next_view, + namespace: self.namespace, + }] } /// Handle a received StartViewChange message. @@ -757,6 +779,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node /// that will be the primary in the new view." pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> Vec<VsrAction> { + debug_assert_eq!( + header.namespace, self.namespace, + "SVC routed to wrong group" + ); let from_replica = header.replica; let msg_view = header.view; @@ -786,7 +812,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } // Send our own SVC - actions.push(VsrAction::SendStartViewChange { view: msg_view }); + actions.push(VsrAction::SendStartViewChange { + view: msg_view, + namespace: self.namespace, + }); } // Record the SVC from sender @@ -816,6 +845,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { log_view: self.log_view.get(), op: current_op, commit: current_commit, + namespace: self.namespace, }); // If we are the primary candidate, record our own DVC @@ -848,6 +878,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// replicas (including itself), it sets its view-number to that in the messages /// and selects as the new log the one contained in the message with the largest v'..." pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> Vec<VsrAction> { + debug_assert_eq!( + header.namespace, self.namespace, + "DVC routed to wrong group" + ); let from_replica = header.replica; let msg_view = header.view; let msg_log_view = header.log_view; @@ -880,7 +914,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } // Send our own SVC - actions.push(VsrAction::SendStartViewChange { view: msg_view }); + actions.push(VsrAction::SendStartViewChange { + view: msg_view, + namespace: self.namespace, + }); } // Only the primary candidate processes DVCs for quorum @@ -939,6 +976,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// in the log, set their view-number to the view number in the message, change /// their status to normal, and send PrepareOK for any uncommitted ops." pub fn handle_start_view(&self, header: &StartViewHeader) -> Vec<VsrAction> { + debug_assert_eq!(header.namespace, self.namespace, "SV routed to wrong group"); let from_replica = header.replica; let msg_view = header.view; let msg_op = header.op; @@ -984,7 +1022,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { actions.push(VsrAction::SendPrepareOk { view: msg_view, op: op_num, - target: from_replica, // Send to new primary + target: from_replica, + namespace: self.namespace, }); } @@ -1020,6 +1059,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { view: self.view.get(), op: new_op, commit: max_commit, + namespace: self.namespace, }] } diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 17f2fb9ea..9ddf4994d 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -29,9 +29,6 @@ pub trait Pipeline { fn pop_message(&mut self) -> Option<Self::Entry>; - /// Extract and remove a message by op number. - fn extract_by_op(&mut self, op: u64) -> Option<Self::Entry>; - fn clear(&mut self); fn message_by_op(&self, op: u64) -> Option<&Self::Entry>; @@ -40,6 +37,8 @@ pub trait Pipeline { fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry>; + fn head(&self) -> Option<&Self::Entry>; + fn is_full(&self) -> bool; fn is_empty(&self) -> bool; diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 60ea1dedf..6906eb9f5 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -135,7 +135,7 @@ where Ok(()) } -/// Shared quorum + extraction flow for ack handling. +/// Shared quorum tracking flow for ack handling. pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: &PrepareOkHeader) -> bool where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, @@ -149,6 +149,33 @@ where true } +/// Drain and return committable prepares from the pipeline head. +/// +/// Entries are drained only from the head and only while their op is covered +/// by the current commit frontier. +pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> Vec<PipelineEntry> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + let commit = consensus.commit(); + let mut drained = Vec::new(); + let mut pipeline = consensus.pipeline().borrow_mut(); + + while let Some(head_op) = pipeline.head().map(|entry| entry.header.op) { + if head_op > commit { + break; + } + + let entry = pipeline + .pop_message() + .expect("drain_committable_prefix: head exists"); + drained.push(entry); + } + + drained +} + /// Shared reply-message construction for committed prepare. pub fn build_reply_message<B, P>( consensus: &VsrConsensus<B, P>, @@ -269,3 +296,107 @@ pub async fn send_prepare_ok<B, P>( .unwrap(); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{Consensus, LocalPipeline}; + use iggy_common::IggyError; + + #[derive(Debug, Default)] + struct NoopBus; + + impl MessageBus for NoopBus { + type Client = u128; + type Replica = u8; + type Data = Message<GenericHeader>; + type Sender = (); + + fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender) -> bool { + true + } + + fn remove_client(&mut self, _client: Self::Client) -> bool { + true + } + + fn add_replica(&mut self, _replica: Self::Replica) -> bool { + true + } + + fn remove_replica(&mut self, _replica: Self::Replica) -> bool { + true + } + + async fn send_to_client( + &self, + _client_id: Self::Client, + _data: Self::Data, + ) -> Result<(), IggyError> { + Ok(()) + } + + async fn send_to_replica( + &self, + _replica: Self::Replica, + _data: Self::Data, + ) -> Result<(), IggyError> { + Ok(()) + } + } + + fn prepare_message(op: u64, parent: u128, checksum: u128) -> Message<PrepareHeader> { + Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header( + |_, new| { + *new = PrepareHeader { + command: Command2::Prepare, + op, + parent, + checksum, + ..Default::default() + }; + }, + ) + } + + #[test] + fn drains_head_prefix_by_commit_frontier() { + let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, LocalPipeline::new()); + consensus.init(); + + consensus.pipeline_message(prepare_message(1, 0, 10)); + consensus.pipeline_message(prepare_message(2, 10, 20)); + consensus.pipeline_message(prepare_message(3, 20, 30)); + + consensus.advance_commit_number(3); + + let drained = drain_committable_prefix(&consensus); + let drained_ops: Vec<_> = drained.into_iter().map(|entry| entry.header.op).collect(); + assert_eq!(drained_ops, vec![1, 2, 3]); + assert!(consensus.pipeline().borrow().is_empty()); + } + + #[test] + fn drains_only_up_to_commit_frontier_even_without_quorum_flags() { + let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, LocalPipeline::new()); + consensus.init(); + + consensus.pipeline_message(prepare_message(5, 0, 50)); + consensus.pipeline_message(prepare_message(6, 50, 60)); + consensus.pipeline_message(prepare_message(7, 60, 70)); + + consensus.advance_commit_number(6); + let drained = drain_committable_prefix(&consensus); + let drained_ops: Vec<_> = drained.into_iter().map(|entry| entry.header.op).collect(); + + assert_eq!(drained_ops, vec![5, 6]); + assert_eq!( + consensus + .pipeline() + .borrow() + .head() + .map(|entry| entry.header.op), + Some(7) + ); + } +} diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9a81461af..c9e5fed54 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -18,7 +18,7 @@ use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; use consensus::{ Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, + ack_quorum_reached, build_reply_message, drain_committable_prefix, fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; @@ -195,47 +195,51 @@ where debug!("on_ack: quorum received for op={}", header.op); - // Extract the header from the pipeline, fetch the full message from journal - // TODO: Commit from the head. ALWAYS - let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); - let Some(entry) = entry else { - warn!("on_ack: prepare not found in pipeline for op={}", header.op); - return; - }; - - let prepare_header = entry.header; - // TODO(hubcio): should we replace this with graceful fallback (warn + return)? - // When journal compaction is implemented compaction could race - // with this lookup if it removes entries below the commit number. - let prepare = journal - .handle() - .entry(&prepare_header) - .await - .unwrap_or_else(|| { - panic!( - "on_ack: committed prepare op={} checksum={} must be in journal", - prepare_header.op, prepare_header.checksum - ) - }); - - // Apply the state (consumes prepare) - // TODO: Handle appending result to response - let _result = self.mux_stm.update(prepare); - debug!("on_ack: state applied for op={}", prepare_header.op); - - // Send reply to client - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); - - // TODO: Error handling - consensus - .message_bus() - .send_to_client(prepare_header.client, generic_reply) - .await - .unwrap() + let drained = drain_committable_prefix(consensus); + if let (Some(first), Some(last)) = (drained.first(), drained.last()) { + debug!( + "on_ack: draining committed prefix ops=[{}..={}] count={}", + first.header.op, + last.header.op, + drained.len() + ); + } + + for entry in drained { + let prepare_header = entry.header; + // TODO(hubcio): should we replace this with graceful fallback (warn + return)? + // When journal compaction is implemented compaction could race + // with this lookup if it removes entries below the commit number. + let prepare = journal + .handle() + .entry(&prepare_header) + .await + .unwrap_or_else(|| { + panic!( + "on_ack: committed prepare op={} checksum={} must be in journal", + prepare_header.op, prepare_header.checksum + ) + }); + + // Apply the state (consumes prepare) + // TODO: Handle appending result to response + let _result = self.mux_stm.update(prepare); + debug!("on_ack: state applied for op={}", prepare_header.op); + + // Send reply to client + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + debug!( + "on_ack: sending reply to client={} for op={}", + prepare_header.client, prepare_header.op + ); + + // TODO: Error handling + consensus + .message_bus() + .send_to_client(prepare_header.client, generic_reply) + .await + .unwrap() + } } } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 54891dfdd..03321a126 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -22,8 +22,9 @@ use crate::Partition; use crate::types::PartitionsConfig; use consensus::{ Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, pipeline_prepare_common, - replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, + ack_quorum_reached, build_reply_message, drain_committable_prefix, fence_old_prepare_by_commit, + pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, + send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, @@ -58,33 +59,29 @@ pub struct IggyPartitions<C> { /// mutate partition state (segments, offsets, journal). partitions: UnsafeCell<Vec<IggyPartition>>, namespace_to_local: HashMap<IggyNamespace, LocalIdx>, - /// Some on shard0, None on other shards - pub consensus: Option<C>, + // TODO: Consider heartbeat/commit coalescing -- one batch message per node pair + // per tick instead of per-group. See CockroachDB heartbeat coalescing. + consensus_groups: HashMap<IggyNamespace, C>, } impl<C> IggyPartitions<C> { - pub fn new(shard_id: ShardId, config: PartitionsConfig, consensus: Option<C>) -> Self { + pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self { Self { shard_id, config, partitions: UnsafeCell::new(Vec::new()), namespace_to_local: HashMap::new(), - consensus, + consensus_groups: HashMap::new(), } } - pub fn with_capacity( - shard_id: ShardId, - config: PartitionsConfig, - consensus: Option<C>, - capacity: usize, - ) -> Self { + pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig, capacity: usize) -> Self { Self { shard_id, config, partitions: UnsafeCell::new(Vec::with_capacity(capacity)), namespace_to_local: HashMap::with_capacity(capacity), - consensus, + consensus_groups: HashMap::with_capacity(capacity), } } @@ -160,6 +157,7 @@ impl<C> IggyPartitions<C> { /// Remove a partition by namespace. Returns the removed partition if found. pub fn remove(&mut self, namespace: &IggyNamespace) -> Option<IggyPartition> { let local_idx = self.namespace_to_local.remove(namespace)?; + self.consensus_groups.remove(namespace); let idx = *local_idx; let partitions = self.partitions.get_mut(); @@ -231,17 +229,29 @@ impl<C> IggyPartitions<C> { &mut self.partitions_mut()[idx] } + pub fn consensus(&self, ns: &IggyNamespace) -> Option<&C> { + self.consensus_groups.get(ns) + } + /// Initialize a new partition with in-memory storage (for testing/simulation). /// - /// This is a simplified version that doesn't create file-backed storage. - /// Use `init_partition()` for production use with real files. + /// On first call, registers both partition and consensus group atomically + /// so Plane dispatch cannot observe a partition without its consensus state. + /// Subsequent calls for the same namespace are no-ops returning the existing index. /// /// TODO: Make the log generic over its storage backend to support both /// in-memory (for testing) and file-backed (for production) storage without /// needing separate initialization methods. - pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> LocalIdx { - // Check if already initialized + pub fn init_partition_in_memory( + &mut self, + namespace: IggyNamespace, + consensus: Option<C>, + ) -> LocalIdx { if let Some(idx) = self.local_idx(&namespace) { + debug_assert!( + self.consensus_groups.contains_key(&namespace) || consensus.is_none(), + "re-init with consensus for namespace that has no consensus group" + ); return idx; } @@ -262,7 +272,14 @@ impl<C> IggyPartitions<C> { partition.stats.increment_segments_count(1); // Insert and return local index - self.insert(namespace, partition) + let idx = self.insert(namespace, partition); + // TODO: Shared WAL -- per-partition fdatasync is untenable at scale. Batch + // journal writes across all groups into a single WAL with one fdatasync per + // tick. See TiKV raft-engine crate. + if let Some(c) = consensus { + self.consensus_groups.insert(namespace, c); + } + idx } /// Initialize a new partition with file-backed storage. @@ -275,10 +292,18 @@ impl<C> IggyPartitions<C> { /// 2. Control plane: broadcast to shards (SKIPPED in this method) /// 3. Data plane: INITIATE PARTITION (THIS METHOD) /// - /// Idempotent - returns existing LocalIdx if partition already exists. - pub async fn init_partition(&mut self, namespace: IggyNamespace) -> LocalIdx { - // Check if already initialized + /// Idempotent: first call registers partition + consensus atomically, + /// subsequent calls for the same namespace are no-ops. + pub async fn init_partition( + &mut self, + namespace: IggyNamespace, + consensus: Option<C>, + ) -> LocalIdx { if let Some(idx) = self.local_idx(&namespace) { + debug_assert!( + self.consensus_groups.contains_key(&namespace) || consensus.is_none(), + "re-init with consensus for namespace that has no consensus group" + ); return idx; } @@ -325,7 +350,11 @@ impl<C> IggyPartitions<C> { partition.stats.increment_segments_count(1); // Insert and return local index - self.insert(namespace, partition) + let idx = self.insert(namespace, partition); + if let Some(c) = consensus { + self.consensus_groups.insert(namespace, c); + } + idx } } @@ -334,17 +363,22 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { - let consensus = self.consensus.as_ref().unwrap(); + let namespace = IggyNamespace::from_raw(message.header().namespace); + let consensus = self + .consensus(&namespace) + .expect("on_request: no consensus group for namespace"); - debug!("handling partition request"); + debug!(?namespace, "handling partition request"); let prepare = message.project(consensus); pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { - let consensus = self.consensus.as_ref().unwrap(); - let header = message.header(); + let namespace = IggyNamespace::from_raw(header.namespace); + let consensus = self + .consensus(&namespace) + .expect("on_replicate: no consensus group for namespace"); let current_op = match replicate_preflight(consensus, header) { Ok(current_op) => current_op, @@ -359,7 +393,7 @@ where let is_old_prepare = fence_old_prepare_by_commit(consensus, header); if !is_old_prepare { - self.replicate(message.clone()).await; + self.replicate(&namespace, message.clone()).await; } else { warn!("received old prepare, not replicating"); } @@ -371,21 +405,21 @@ where // TODO: Figure out the flow of the partition operations. // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - let namespace = IggyNamespace::from_raw(header.namespace); - self.apply_replicated_operation(&message, &namespace).await; + self.apply_replicated_operation(&namespace, &message).await; - // After successful journal write, send prepare_ok to primary. - self.send_prepare_ok(header).await; + self.send_prepare_ok(&namespace, header).await; - // If follower, commit any newly committable entries. if consensus.is_follower() { - self.commit_journal(); + self.commit_journal(&namespace); } } async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { - let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); + let namespace = IggyNamespace::from_raw(header.namespace); + let consensus = self + .consensus(&namespace) + .expect("on_ack: no consensus group for namespace"); if let Err(reason) = ack_preflight(consensus) { warn!("on_ack: ignoring ({reason})"); @@ -394,72 +428,68 @@ where { let pipeline = consensus.pipeline().borrow(); - let Some(entry) = - pipeline.message_by_op_and_checksum(header.op, header.prepare_checksum) - else { + if pipeline + .message_by_op_and_checksum(header.op, header.prepare_checksum) + .is_none() + { debug!("on_ack: prepare not in pipeline op={}", header.op); return; - }; - - if entry.header.checksum != header.prepare_checksum { - warn!("on_ack: checksum mismatch"); - return; } } if ack_quorum_reached(consensus, header) { debug!("on_ack: quorum received for op={}", header.op); - // Extract the prepare message from the pipeline by op - // TODO: Commit from the head. ALWAYS - let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); - let Some(PipelineEntry { + let drained = drain_committable_prefix(consensus); + if let (Some(first), Some(last)) = (drained.first(), drained.last()) { + debug!( + "on_ack: draining committed prefix ops=[{}..={}] count={}", + first.header.op, + last.header.op, + drained.len() + ); + } + + for PipelineEntry { header: prepare_header, .. - }) = entry - else { - warn!("on_ack: prepare not found in pipeline for op={}", header.op); - return; - }; - - // Data was already appended to the partition journal during - // on_replicate. Now that quorum is reached, update the partition's - // current offset and check whether the journal needs flushing. - let namespace = IggyNamespace::from_raw(prepare_header.namespace); - - match prepare_header.operation { - Operation::SendMessages => { - self.commit_messages(&namespace).await; - debug!("on_ack: messages committed for op={}", prepare_header.op,); - } - Operation::StoreConsumerOffset => { - // TODO: Commit consumer offset update. - debug!( - "on_ack: consumer offset committed for op={}", - prepare_header.op - ); + } in drained + { + let entry_namespace = IggyNamespace::from_raw(prepare_header.namespace); + + match prepare_header.operation { + Operation::SendMessages => { + self.commit_messages(&entry_namespace).await; + debug!("on_ack: messages committed for op={}", prepare_header.op,); + } + Operation::StoreConsumerOffset => { + // TODO: Commit consumer offset update. + debug!( + "on_ack: consumer offset committed for op={}", + prepare_header.op + ); + } + _ => { + warn!( + "on_ack: unexpected operation {:?} for op={}", + prepare_header.operation, prepare_header.op + ); + } } - _ => { - warn!( - "on_ack: unexpected operation {:?} for op={}", - prepare_header.operation, prepare_header.op - ); - } - } - // Send reply to client - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + debug!( + "on_ack: sending reply to client={} for op={}", + prepare_header.client, prepare_header.op + ); - // TODO: Error handling - consensus - .message_bus() - .send_to_client(prepare_header.client, generic_reply) - .await - .unwrap() + // TODO: Error handling + consensus + .message_bus() + .send_to_client(prepare_header.client, generic_reply) + .await + .unwrap() + } } } } @@ -488,10 +518,12 @@ where async fn apply_replicated_operation( &self, - message: &Message<PrepareHeader>, namespace: &IggyNamespace, + message: &Message<PrepareHeader>, ) { - let consensus = self.consensus.as_ref().unwrap(); + let consensus = self + .consensus(namespace) + .expect("apply_replicated_operation: no consensus group"); let header = message.header(); match header.operation { @@ -552,16 +584,16 @@ where /// Replicate a prepare message to the next replica in the chain. /// - /// Chain replication pattern: - /// - Primary sends to first backup - /// - Each backup forwards to the next - /// - Stops when we would forward back to primary - async fn replicate(&self, message: Message<PrepareHeader>) { - let consensus = self.consensus.as_ref().unwrap(); + /// Chain replication: primary -> first backup -> ... -> last backup. + /// Stops when the next replica would be the primary. + async fn replicate(&self, namespace: &IggyNamespace, message: Message<PrepareHeader>) { + let consensus = self + .consensus(namespace) + .expect("replicate: no consensus group"); replicate_to_next_in_chain(consensus, message).await; } - fn commit_journal(&self) { + fn commit_journal(&self, _namespace: &IggyNamespace) { // TODO: Implement commit logic for followers. // Walk through journal from last committed to current commit number // Apply each entry to the partition state @@ -786,8 +818,10 @@ where debug!(?namespace, start_offset, "rotated to new segment"); } - async fn send_prepare_ok(&self, header: &PrepareHeader) { - let consensus = self.consensus.as_ref().unwrap(); + async fn send_prepare_ok(&self, namespace: &IggyNamespace, header: &PrepareHeader) { + let consensus = self + .consensus(namespace) + .expect("send_prepare_ok: no consensus group"); // TODO: Verify the prepare is persisted in the partition journal. // The partition journal uses MessageLookup headers, so we cannot // check by PrepareHeader.op directly. For now, skip this check. diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index 44e063f78..4228abc13 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -34,10 +34,10 @@ pub struct Simulator { } impl Simulator { - /// Initialize a partition on all replicas (in-memory for simulation) + /// Initialize a partition with its own consensus group on all replicas. pub fn init_partition(&mut self, namespace: iggy_common::sharding::IggyNamespace) { for replica in &mut self.replicas { - replica.partitions.init_partition_in_memory(namespace); + replica.init_partition(namespace); } } @@ -158,3 +158,10 @@ impl Simulator { } } } + +// TODO(IGGY-66): Add acceptance test for per-partition consensus independence. +// Setup: 3-replica simulator, two partitions (ns_a, ns_b). +// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering acks. +// 2. Send a request to ns_b, step until ns_b reply arrives. +// 3. Assert ns_b committed while ns_a pipeline is still full. +// Requires namespace-aware stepping (filter bus by namespace) or two-phase delivery. diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index ae4631e95..52800f961 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -21,7 +21,7 @@ use crate::deps::{ }; use consensus::{LocalPipeline, VsrConsensus}; use iggy_common::IggyByteSize; -use iggy_common::sharding::ShardId; +use iggy_common::sharding::{IggyNamespace, ShardId}; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; @@ -29,9 +29,13 @@ use metadata::{IggyMetadata, variadic}; use partitions::PartitionsConfig; use std::sync::Arc; +// TODO: Make configurable +const CLUSTER_ID: u128 = 1; + pub struct Replica { pub id: u8, pub name: String, + pub replica_count: u8, pub metadata: SimMetadata, pub partitions: ReplicaPartitions, pub bus: Arc<MemBus>, @@ -44,48 +48,30 @@ impl Replica { let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into(); let mux = SimMuxStateMachine::new(variadic!(users, streams, consumer_groups)); - let cluster_id: u128 = 1; // TODO: Make configurable + // Metadata uses namespace=0 (not partition-scoped) let metadata_consensus = VsrConsensus::new( - cluster_id, + CLUSTER_ID, id, replica_count, + 0, SharedMemBus(Arc::clone(&bus)), LocalPipeline::new(), ); metadata_consensus.init(); - // Create separate consensus instance for partitions - let partitions_consensus = VsrConsensus::new( - cluster_id, - id, - replica_count, - SharedMemBus(Arc::clone(&bus)), - LocalPipeline::new(), - ); - partitions_consensus.init(); - - // Configure partitions let partitions_config = PartitionsConfig { messages_required_to_save: 1000, size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 1024), - enforce_fsync: false, // Disable fsync for simulation - segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB segments + enforce_fsync: false, + segment_size: IggyByteSize::from(1024 * 1024 * 1024), }; - // Only replica 0 gets consensus (primary shard for now) - let partitions = if id == 0 { - ReplicaPartitions::new( - ShardId::new(id as u16), - partitions_config, - Some(partitions_consensus), - ) - } else { - ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, None) - }; + let partitions = ReplicaPartitions::new(ShardId::new(id as u16), partitions_config); Self { id, name, + replica_count, metadata: IggyMetadata { consensus: Some(metadata_consensus), journal: Some(SimJournal::<MemStorage>::default()), @@ -96,4 +82,36 @@ impl Replica { bus, } } + + pub fn init_partition(&mut self, namespace: IggyNamespace) { + if self.partitions.local_idx(&namespace).is_some() { + return; + } + let consensus = Self::create_partition_consensus( + self.id, + self.replica_count, + &self.bus, + namespace.inner(), + ); + self.partitions + .init_partition_in_memory(namespace, Some(consensus)); + } + + fn create_partition_consensus( + id: u8, + replica_count: u8, + bus: &Arc<MemBus>, + namespace: u64, + ) -> VsrConsensus<SharedMemBus> { + let consensus = VsrConsensus::new( + CLUSTER_ID, + id, + replica_count, + namespace, + SharedMemBus(Arc::clone(bus)), + LocalPipeline::new(), + ); + consensus.init(); + consensus + } }
