This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch nonblocking-consensus in repository https://gitbox.apache.org/repos/asf/iggy.git
commit ed55789edee7b9c33af3aeaa3105120d8c1738fb Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Feb 18 11:02:31 2026 +0100 feat(consensus): implement per-partition consensus Shared pipeline across all partitions caps throughput at 8 in-flight ops. Each partition now gets its own VsrConsensus with independent pipeline and sequencing. IggyPartitions stores consensus_groups by namespace. Plane impl routes by namespace from message headers. View change headers (SVC/DVC/SV) gain namespace and align their frame layout with GenericHeader. --- core/common/src/types/consensus/header.rs | 24 ++-- core/consensus/src/impls.rs | 78 +++++++++-- core/consensus/src/lib.rs | 2 + core/consensus/src/plane_helpers.rs | 133 ++++++++++++++++++- core/metadata/src/impls/metadata.rs | 88 +++++++------ core/partitions/src/iggy_partitions.rs | 209 +++++++++++++++++------------- core/simulator/src/lib.rs | 11 +- core/simulator/src/replica.rs | 70 ++++++---- 8 files changed, 428 insertions(+), 187 deletions(-) diff --git a/core/common/src/types/consensus/header.rs b/core/common/src/types/consensus/header.rs index c8d311fa7..066b9b681 100644 --- a/core/common/src/types/consensus/header.rs +++ b/core/common/src/types/consensus/header.rs @@ -135,8 +135,7 @@ pub struct GenericHeader { pub replica: u8, pub reserved_frame: [u8; 66], - pub namespace: u64, - pub reserved_command: [u8; 120], + pub reserved_command: [u8; 128], } unsafe impl Pod for GenericHeader {} @@ -489,18 +488,17 @@ impl Default for ReplyHeader { #[repr(C)] pub struct StartViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], - pub reserved: [u8; 128], + pub namespace: u64, + pub reserved: [u8; 120], } unsafe impl Pod for StartViewChangeHeader {} @@ -536,16 +534,14 @@ impl ConsensusHeader for StartViewChangeHeader { #[repr(C)] pub struct DoViewChangeHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], /// The highest op-number in this replica's log. /// Used to select the most complete log when log_view values are equal. @@ -553,11 +549,12 @@ pub struct DoViewChangeHeader { /// The replica's commit number (highest committed op). /// The new primary sets its commit to max(commit) across all DVCs. pub commit: u64, + pub namespace: u64, /// The view number when this replica's status was last normal. /// This is the key field for log selection: the replica with the /// highest log_view has the most authoritative log. pub log_view: u32, - pub reserved: [u8; 108], + pub reserved: [u8; 100], } unsafe impl Pod for DoViewChangeHeader {} @@ -609,16 +606,14 @@ impl ConsensusHeader for DoViewChangeHeader { #[repr(C)] pub struct StartViewHeader { pub checksum: u128, - pub checksum_padding: u128, pub checksum_body: u128, - pub checksum_body_padding: u128, pub cluster: u128, pub size: u32, pub view: u32, pub release: u32, pub command: Command2, pub replica: u8, - pub reserved_frame: [u8; 42], + pub reserved_frame: [u8; 66], /// The op-number of the highest entry in the new primary's log. /// Backups set their op to this value. @@ -627,7 +622,8 @@ pub struct StartViewHeader { /// This is max(commit) from all DVCs received by the primary. /// Backups set their commit to this value. pub commit: u64, - pub reserved: [u8; 112], + pub namespace: u64, + pub reserved: [u8; 104], } unsafe impl Pod for StartViewHeader {} diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 4b1e620c8..e226ae97f 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -365,6 +365,10 @@ impl Pipeline for LocalPipeline { LocalPipeline::message_by_op_and_checksum(self, op, checksum) } + fn head(&self) -> Option<&Self::Entry> { + LocalPipeline::head(self) + } + fn is_full(&self) -> bool { LocalPipeline::is_full(self) } @@ -389,7 +393,7 @@ pub enum Status { #[derive(Debug, Clone, PartialEq, Eq)] pub enum VsrAction { /// Send StartViewChange to all replicas. - SendStartViewChange { view: u32 }, + SendStartViewChange { view: u32, namespace: u64 }, /// Send DoViewChange to primary. SendDoViewChange { view: u32, @@ -397,11 +401,22 @@ pub enum VsrAction { log_view: u32, op: u64, commit: u64, + namespace: u64, }, /// Send StartView to all backups (as new primary). - SendStartView { view: u32, op: u64, commit: u64 }, + SendStartView { + view: u32, + op: u64, + commit: u64, + namespace: u64, + }, /// Send PrepareOK to primary. - SendPrepareOk { view: u32, op: u64, target: u8 }, + SendPrepareOk { + view: u32, + op: u64, + target: u8, + namespace: u64, + }, } #[allow(unused)] @@ -414,6 +429,7 @@ where cluster: u128, replica: u8, replica_count: u8, + namespace: u64, view: Cell<u32>, @@ -455,16 +471,28 @@ where } impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { - pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, pipeline: P) -> Self { + pub fn new( + cluster: u128, + replica: u8, + replica_count: u8, + message_bus: B, + pipeline: P, + namespace: u64, + ) -> Self { assert!( replica < replica_count, "replica index must be < replica_count" ); assert!(replica_count >= 1, "need at least 1 replica"); + // TODO: Verify that XOR-based seeding provides sufficient jitter diversity + // across groups. Consider using a proper hash (e.g., Murmur3) of + // (replica_id, namespace) for production. + let timeout_seed = replica as u128 ^ namespace as u128; Self { cluster, replica, replica_count, + namespace, view: Cell::new(0), log_view: Cell::new(0), status: Cell::new(Status::Recovering), @@ -479,7 +507,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { do_view_change_quorum: Cell::new(false), sent_own_start_view_change: Cell::new(false), sent_own_do_view_change: Cell::new(false), - timeouts: RefCell::new(TimeoutManager::new(replica as u128)), + timeouts: RefCell::new(TimeoutManager::new(timeout_seed)), } } @@ -563,6 +591,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { self.replica_count } + pub fn namespace(&self) -> u64 { + self.namespace + } + pub fn log_view(&self) -> u32 { self.log_view.get() } @@ -678,7 +710,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { timeouts.start(TimeoutKind::ViewChangeStatus); } - vec![VsrAction::SendStartViewChange { view: new_view }] + vec![VsrAction::SendStartViewChange { + view: new_view, + namespace: self.namespace, + }] } /// Resend SVC message if we've started view change. @@ -693,6 +728,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { vec![VsrAction::SendStartViewChange { view: self.view.get(), + namespace: self.namespace, }] } @@ -725,6 +761,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { log_view: self.log_view.get(), op: current_op, commit: current_commit, + namespace: self.namespace, }] } @@ -748,7 +785,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { .borrow_mut() .reset(TimeoutKind::ViewChangeStatus); - vec![VsrAction::SendStartViewChange { view: next_view }] + vec![VsrAction::SendStartViewChange { + view: next_view, + namespace: self.namespace, + }] } /// Handle a received StartViewChange message. @@ -757,6 +797,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// from f OTHER replicas, it sends a DOVIEWCHANGE message to the node /// that will be the primary in the new view." pub fn handle_start_view_change(&self, header: &StartViewChangeHeader) -> Vec<VsrAction> { + debug_assert_eq!( + header.namespace, self.namespace, + "SVC routed to wrong group" + ); let from_replica = header.replica; let msg_view = header.view; @@ -786,7 +830,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } // Send our own SVC - actions.push(VsrAction::SendStartViewChange { view: msg_view }); + actions.push(VsrAction::SendStartViewChange { + view: msg_view, + namespace: self.namespace, + }); } // Record the SVC from sender @@ -816,6 +863,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { log_view: self.log_view.get(), op: current_op, commit: current_commit, + namespace: self.namespace, }); // If we are the primary candidate, record our own DVC @@ -848,6 +896,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// replicas (including itself), it sets its view-number to that in the messages /// and selects as the new log the one contained in the message with the largest v'..." pub fn handle_do_view_change(&self, header: &DoViewChangeHeader) -> Vec<VsrAction> { + debug_assert_eq!( + header.namespace, self.namespace, + "DVC routed to wrong group" + ); let from_replica = header.replica; let msg_view = header.view; let msg_log_view = header.log_view; @@ -880,7 +932,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { } // Send our own SVC - actions.push(VsrAction::SendStartViewChange { view: msg_view }); + actions.push(VsrAction::SendStartViewChange { + view: msg_view, + namespace: self.namespace, + }); } // Only the primary candidate processes DVCs for quorum @@ -939,6 +994,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { /// in the log, set their view-number to the view number in the message, change /// their status to normal, and send PrepareOK for any uncommitted ops." pub fn handle_start_view(&self, header: &StartViewHeader) -> Vec<VsrAction> { + debug_assert_eq!(header.namespace, self.namespace, "SV routed to wrong group"); let from_replica = header.replica; let msg_view = header.view; let msg_op = header.op; @@ -984,7 +1040,8 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { actions.push(VsrAction::SendPrepareOk { view: msg_view, op: op_num, - target: from_replica, // Send to new primary + target: from_replica, + namespace: self.namespace, }); } @@ -1020,6 +1077,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { view: self.view.get(), op: new_op, commit: max_commit, + namespace: self.namespace, }] } diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index 17f2fb9ea..b6a1bf17c 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -40,6 +40,8 @@ pub trait Pipeline { fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&Self::Entry>; + fn head(&self) -> Option<&Self::Entry>; + fn is_full(&self) -> bool; fn is_empty(&self) -> bool; diff --git a/core/consensus/src/plane_helpers.rs b/core/consensus/src/plane_helpers.rs index 60ea1dedf..35303b85b 100644 --- a/core/consensus/src/plane_helpers.rs +++ b/core/consensus/src/plane_helpers.rs @@ -135,7 +135,7 @@ where Ok(()) } -/// Shared quorum + extraction flow for ack handling. +/// Shared quorum tracking flow for ack handling. pub fn ack_quorum_reached<B, P>(consensus: &VsrConsensus<B, P>, ack: &PrepareOkHeader) -> bool where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, @@ -149,6 +149,33 @@ where true } +/// Drain and return committable prepares from the pipeline head. +/// +/// Entries are drained only from the head and only while their op is covered +/// by the current commit frontier. +pub fn drain_committable_prefix<B, P>(consensus: &VsrConsensus<B, P>) -> Vec<PipelineEntry> +where + B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + let commit = consensus.commit(); + let mut drained = Vec::new(); + let mut pipeline = consensus.pipeline().borrow_mut(); + + while let Some(head_op) = pipeline.head().map(|entry| entry.header.op) { + if head_op > commit { + break; + } + + let entry = pipeline + .pop_message() + .expect("drain_committable_prefix: head exists"); + drained.push(entry); + } + + drained +} + /// Shared reply-message construction for committed prepare. pub fn build_reply_message<B, P>( consensus: &VsrConsensus<B, P>, @@ -269,3 +296,107 @@ pub async fn send_prepare_ok<B, P>( .unwrap(); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{Consensus, LocalPipeline}; + use iggy_common::IggyError; + + #[derive(Debug, Default)] + struct NoopBus; + + impl MessageBus for NoopBus { + type Client = u128; + type Replica = u8; + type Data = Message<GenericHeader>; + type Sender = (); + + fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender) -> bool { + true + } + + fn remove_client(&mut self, _client: Self::Client) -> bool { + true + } + + fn add_replica(&mut self, _replica: Self::Replica) -> bool { + true + } + + fn remove_replica(&mut self, _replica: Self::Replica) -> bool { + true + } + + async fn send_to_client( + &self, + _client_id: Self::Client, + _data: Self::Data, + ) -> Result<(), IggyError> { + Ok(()) + } + + async fn send_to_replica( + &self, + _replica: Self::Replica, + _data: Self::Data, + ) -> Result<(), IggyError> { + Ok(()) + } + } + + fn prepare_message(op: u64, parent: u128, checksum: u128) -> Message<PrepareHeader> { + Message::<PrepareHeader>::new(std::mem::size_of::<PrepareHeader>()).transmute_header( + |_, new| { + *new = PrepareHeader { + command: Command2::Prepare, + op, + parent, + checksum, + ..Default::default() + }; + }, + ) + } + + #[test] + fn drains_head_prefix_by_commit_frontier() { + let consensus = VsrConsensus::new(1, 0, 3, NoopBus, LocalPipeline::new(), 0); + consensus.init(); + + consensus.pipeline_message(prepare_message(1, 0, 10)); + consensus.pipeline_message(prepare_message(2, 10, 20)); + consensus.pipeline_message(prepare_message(3, 20, 30)); + + consensus.advance_commit_number(3); + + let drained = drain_committable_prefix(&consensus); + let drained_ops: Vec<_> = drained.into_iter().map(|entry| entry.header.op).collect(); + assert_eq!(drained_ops, vec![1, 2, 3]); + assert!(consensus.pipeline().borrow().is_empty()); + } + + #[test] + fn drains_only_up_to_commit_frontier_even_without_quorum_flags() { + let consensus = VsrConsensus::new(1, 0, 3, NoopBus, LocalPipeline::new(), 0); + consensus.init(); + + consensus.pipeline_message(prepare_message(5, 0, 50)); + consensus.pipeline_message(prepare_message(6, 50, 60)); + consensus.pipeline_message(prepare_message(7, 60, 70)); + + consensus.advance_commit_number(6); + let drained = drain_committable_prefix(&consensus); + let drained_ops: Vec<_> = drained.into_iter().map(|entry| entry.header.op).collect(); + + assert_eq!(drained_ops, vec![5, 6]); + assert_eq!( + consensus + .pipeline() + .borrow() + .head() + .map(|entry| entry.header.op), + Some(7) + ); + } +} diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 9a81461af..c9e5fed54 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -18,7 +18,7 @@ use crate::stm::StateMachine; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; use consensus::{ Consensus, Pipeline, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, + ack_quorum_reached, build_reply_message, drain_committable_prefix, fence_old_prepare_by_commit, panic_if_hash_chain_would_break_in_same_view, pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; @@ -195,47 +195,51 @@ where debug!("on_ack: quorum received for op={}", header.op); - // Extract the header from the pipeline, fetch the full message from journal - // TODO: Commit from the head. ALWAYS - let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); - let Some(entry) = entry else { - warn!("on_ack: prepare not found in pipeline for op={}", header.op); - return; - }; - - let prepare_header = entry.header; - // TODO(hubcio): should we replace this with graceful fallback (warn + return)? - // When journal compaction is implemented compaction could race - // with this lookup if it removes entries below the commit number. - let prepare = journal - .handle() - .entry(&prepare_header) - .await - .unwrap_or_else(|| { - panic!( - "on_ack: committed prepare op={} checksum={} must be in journal", - prepare_header.op, prepare_header.checksum - ) - }); - - // Apply the state (consumes prepare) - // TODO: Handle appending result to response - let _result = self.mux_stm.update(prepare); - debug!("on_ack: state applied for op={}", prepare_header.op); - - // Send reply to client - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); - - // TODO: Error handling - consensus - .message_bus() - .send_to_client(prepare_header.client, generic_reply) - .await - .unwrap() + let drained = drain_committable_prefix(consensus); + if let (Some(first), Some(last)) = (drained.first(), drained.last()) { + debug!( + "on_ack: draining committed prefix ops=[{}..={}] count={}", + first.header.op, + last.header.op, + drained.len() + ); + } + + for entry in drained { + let prepare_header = entry.header; + // TODO(hubcio): should we replace this with graceful fallback (warn + return)? + // When journal compaction is implemented compaction could race + // with this lookup if it removes entries below the commit number. + let prepare = journal + .handle() + .entry(&prepare_header) + .await + .unwrap_or_else(|| { + panic!( + "on_ack: committed prepare op={} checksum={} must be in journal", + prepare_header.op, prepare_header.checksum + ) + }); + + // Apply the state (consumes prepare) + // TODO: Handle appending result to response + let _result = self.mux_stm.update(prepare); + debug!("on_ack: state applied for op={}", prepare_header.op); + + // Send reply to client + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + debug!( + "on_ack: sending reply to client={} for op={}", + prepare_header.client, prepare_header.op + ); + + // TODO: Error handling + consensus + .message_bus() + .send_to_client(prepare_header.client, generic_reply) + .await + .unwrap() + } } } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 54891dfdd..2d571f19f 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -22,8 +22,9 @@ use crate::Partition; use crate::types::PartitionsConfig; use consensus::{ Consensus, PipelineEntry, Plane, Project, Sequencer, VsrConsensus, ack_preflight, - ack_quorum_reached, build_reply_message, fence_old_prepare_by_commit, pipeline_prepare_common, - replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, + ack_quorum_reached, build_reply_message, drain_committable_prefix, fence_old_prepare_by_commit, + pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, + send_prepare_ok as send_prepare_ok_common, }; use iggy_common::{ INDEX_SIZE, IggyByteSize, IggyIndexesMut, IggyMessagesBatchMut, PartitionStats, PooledBuffer, @@ -58,33 +59,29 @@ pub struct IggyPartitions<C> { /// mutate partition state (segments, offsets, journal). partitions: UnsafeCell<Vec<IggyPartition>>, namespace_to_local: HashMap<IggyNamespace, LocalIdx>, - /// Some on shard0, None on other shards - pub consensus: Option<C>, + // TODO: Consider heartbeat/commit coalescing -- one batch message per node pair + // per tick instead of per-group. See CockroachDB heartbeat coalescing. + consensus_groups: HashMap<IggyNamespace, C>, } impl<C> IggyPartitions<C> { - pub fn new(shard_id: ShardId, config: PartitionsConfig, consensus: Option<C>) -> Self { + pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self { Self { shard_id, config, partitions: UnsafeCell::new(Vec::new()), namespace_to_local: HashMap::new(), - consensus, + consensus_groups: HashMap::new(), } } - pub fn with_capacity( - shard_id: ShardId, - config: PartitionsConfig, - consensus: Option<C>, - capacity: usize, - ) -> Self { + pub fn with_capacity(shard_id: ShardId, config: PartitionsConfig, capacity: usize) -> Self { Self { shard_id, config, partitions: UnsafeCell::new(Vec::with_capacity(capacity)), namespace_to_local: HashMap::with_capacity(capacity), - consensus, + consensus_groups: HashMap::with_capacity(capacity), } } @@ -160,6 +157,7 @@ impl<C> IggyPartitions<C> { /// Remove a partition by namespace. Returns the removed partition if found. pub fn remove(&mut self, namespace: &IggyNamespace) -> Option<IggyPartition> { let local_idx = self.namespace_to_local.remove(namespace)?; + self.consensus_groups.remove(namespace); let idx = *local_idx; let partitions = self.partitions.get_mut(); @@ -231,26 +229,31 @@ impl<C> IggyPartitions<C> { &mut self.partitions_mut()[idx] } + pub fn consensus(&self, ns: &IggyNamespace) -> Option<&C> { + self.consensus_groups.get(ns) + } + /// Initialize a new partition with in-memory storage (for testing/simulation). /// - /// This is a simplified version that doesn't create file-backed storage. - /// Use `init_partition()` for production use with real files. + /// Registers the consensus group atomically with the partition so Plane + /// dispatch cannot observe a partition without its consensus state. /// /// TODO: Make the log generic over its storage backend to support both /// in-memory (for testing) and file-backed (for production) storage without /// needing separate initialization methods. - pub fn init_partition_in_memory(&mut self, namespace: IggyNamespace) -> LocalIdx { - // Check if already initialized + pub fn init_partition_in_memory( + &mut self, + namespace: IggyNamespace, + consensus: Option<C>, + ) -> LocalIdx { if let Some(idx) = self.local_idx(&namespace) { return idx; } - // Create initial segment with default (in-memory) storage let start_offset = 0; let segment = Segment::new(start_offset, self.config.segment_size); let storage = SegmentStorage::default(); - // Create partition with initialized log let stats = Arc::new(PartitionStats::default()); let mut partition = IggyPartition::new(stats.clone()); partition.log.add_persisted_segment(segment, storage); @@ -261,8 +264,14 @@ impl<C> IggyPartitions<C> { partition.should_increment_offset = false; partition.stats.increment_segments_count(1); - // Insert and return local index - self.insert(namespace, partition) + let idx = self.insert(namespace, partition); + // TODO: Shared WAL -- per-partition fdatasync is untenable at scale. Batch + // journal writes across all groups into a single WAL with one fdatasync per + // tick. See TiKV raft-engine crate. + if let Some(c) = consensus { + self.consensus_groups.insert(namespace, c); + } + idx } /// Initialize a new partition with file-backed storage. @@ -276,8 +285,11 @@ impl<C> IggyPartitions<C> { /// 3. Data plane: INITIATE PARTITION (THIS METHOD) /// /// Idempotent - returns existing LocalIdx if partition already exists. - pub async fn init_partition(&mut self, namespace: IggyNamespace) -> LocalIdx { - // Check if already initialized + pub async fn init_partition( + &mut self, + namespace: IggyNamespace, + consensus: Option<C>, + ) -> LocalIdx { if let Some(idx) = self.local_idx(&namespace) { return idx; } @@ -325,7 +337,11 @@ impl<C> IggyPartitions<C> { partition.stats.increment_segments_count(1); // Insert and return local index - self.insert(namespace, partition) + let idx = self.insert(namespace, partition); + if let Some(c) = consensus { + self.consensus_groups.insert(namespace, c); + } + idx } } @@ -334,17 +350,22 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, { async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { - let consensus = self.consensus.as_ref().unwrap(); + let namespace = IggyNamespace::from_raw(message.header().namespace); + let consensus = self + .consensus(&namespace) + .expect("on_request: no consensus group for namespace"); - debug!("handling partition request"); + debug!(?namespace, "handling partition request"); let prepare = message.project(consensus); pipeline_prepare_common(consensus, prepare, |prepare| self.on_replicate(prepare)).await; } async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { - let consensus = self.consensus.as_ref().unwrap(); - let header = message.header(); + let namespace = IggyNamespace::from_raw(header.namespace); + let consensus = self + .consensus(&namespace) + .expect("on_replicate: no consensus group for namespace"); let current_op = match replicate_preflight(consensus, header) { Ok(current_op) => current_op, @@ -359,7 +380,7 @@ where let is_old_prepare = fence_old_prepare_by_commit(consensus, header); if !is_old_prepare { - self.replicate(message.clone()).await; + self.replicate(&namespace, message.clone()).await; } else { warn!("received old prepare, not replicating"); } @@ -371,21 +392,21 @@ where // TODO: Figure out the flow of the partition operations. // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! - let namespace = IggyNamespace::from_raw(header.namespace); - self.apply_replicated_operation(&message, &namespace).await; + self.apply_replicated_operation(&namespace, &message).await; - // After successful journal write, send prepare_ok to primary. - self.send_prepare_ok(header).await; + self.send_prepare_ok(&namespace, header).await; - // If follower, commit any newly committable entries. if consensus.is_follower() { - self.commit_journal(); + self.commit_journal(&namespace); } } async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { - let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); + let namespace = IggyNamespace::from_raw(header.namespace); + let consensus = self + .consensus(&namespace) + .expect("on_ack: no consensus group for namespace"); if let Err(reason) = ack_preflight(consensus) { warn!("on_ack: ignoring ({reason})"); @@ -410,56 +431,56 @@ where if ack_quorum_reached(consensus, header) { debug!("on_ack: quorum received for op={}", header.op); - // Extract the prepare message from the pipeline by op - // TODO: Commit from the head. ALWAYS - let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); - let Some(PipelineEntry { + let drained = drain_committable_prefix(consensus); + if let (Some(first), Some(last)) = (drained.first(), drained.last()) { + debug!( + "on_ack: draining committed prefix ops=[{}..={}] count={}", + first.header.op, + last.header.op, + drained.len() + ); + } + + for PipelineEntry { header: prepare_header, .. - }) = entry - else { - warn!("on_ack: prepare not found in pipeline for op={}", header.op); - return; - }; + } in drained + { + let entry_namespace = IggyNamespace::from_raw(prepare_header.namespace); + + match prepare_header.operation { + Operation::SendMessages => { + self.commit_messages(&entry_namespace).await; + debug!("on_ack: messages committed for op={}", prepare_header.op,); + } + Operation::StoreConsumerOffset => { + // TODO: Commit consumer offset update. + debug!( + "on_ack: consumer offset committed for op={}", + prepare_header.op + ); + } + _ => { + warn!( + "on_ack: unexpected operation {:?} for op={}", + prepare_header.operation, prepare_header.op + ); + } + } - // Data was already appended to the partition journal during - // on_replicate. Now that quorum is reached, update the partition's - // current offset and check whether the journal needs flushing. - let namespace = IggyNamespace::from_raw(prepare_header.namespace); + let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); + debug!( + "on_ack: sending reply to client={} for op={}", + prepare_header.client, prepare_header.op + ); - match prepare_header.operation { - Operation::SendMessages => { - self.commit_messages(&namespace).await; - debug!("on_ack: messages committed for op={}", prepare_header.op,); - } - Operation::StoreConsumerOffset => { - // TODO: Commit consumer offset update. - debug!( - "on_ack: consumer offset committed for op={}", - prepare_header.op - ); - } - _ => { - warn!( - "on_ack: unexpected operation {:?} for op={}", - prepare_header.operation, prepare_header.op - ); - } + // TODO: Error handling + consensus + .message_bus() + .send_to_client(prepare_header.client, generic_reply) + .await + .unwrap() } - - // Send reply to client - let generic_reply = build_reply_message(consensus, &prepare_header).into_generic(); - debug!( - "on_ack: sending reply to client={} for op={}", - prepare_header.client, prepare_header.op - ); - - // TODO: Error handling - consensus - .message_bus() - .send_to_client(prepare_header.client, generic_reply) - .await - .unwrap() } } } @@ -488,10 +509,12 @@ where async fn apply_replicated_operation( &self, - message: &Message<PrepareHeader>, namespace: &IggyNamespace, + message: &Message<PrepareHeader>, ) { - let consensus = self.consensus.as_ref().unwrap(); + let consensus = self + .consensus(namespace) + .expect("apply_replicated_operation: no consensus group"); let header = message.header(); match header.operation { @@ -552,16 +575,16 @@ where /// Replicate a prepare message to the next replica in the chain. /// - /// Chain replication pattern: - /// - Primary sends to first backup - /// - Each backup forwards to the next - /// - Stops when we would forward back to primary - async fn replicate(&self, message: Message<PrepareHeader>) { - let consensus = self.consensus.as_ref().unwrap(); + /// Chain replication: primary -> first backup -> ... -> last backup. + /// Stops when the next replica would be the primary. + async fn replicate(&self, namespace: &IggyNamespace, message: Message<PrepareHeader>) { + let consensus = self + .consensus(namespace) + .expect("replicate: no consensus group"); replicate_to_next_in_chain(consensus, message).await; } - fn commit_journal(&self) { + fn commit_journal(&self, _namespace: &IggyNamespace) { // TODO: Implement commit logic for followers. // Walk through journal from last committed to current commit number // Apply each entry to the partition state @@ -786,8 +809,10 @@ where debug!(?namespace, start_offset, "rotated to new segment"); } - async fn send_prepare_ok(&self, header: &PrepareHeader) { - let consensus = self.consensus.as_ref().unwrap(); + async fn send_prepare_ok(&self, namespace: &IggyNamespace, header: &PrepareHeader) { + let consensus = self + .consensus(namespace) + .expect("send_prepare_ok: no consensus group"); // TODO: Verify the prepare is persisted in the partition journal. // The partition journal uses MessageLookup headers, so we cannot // check by PrepareHeader.op directly. For now, skip this check. diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs index 44e063f78..4228abc13 100644 --- a/core/simulator/src/lib.rs +++ b/core/simulator/src/lib.rs @@ -34,10 +34,10 @@ pub struct Simulator { } impl Simulator { - /// Initialize a partition on all replicas (in-memory for simulation) + /// Initialize a partition with its own consensus group on all replicas. pub fn init_partition(&mut self, namespace: iggy_common::sharding::IggyNamespace) { for replica in &mut self.replicas { - replica.partitions.init_partition_in_memory(namespace); + replica.init_partition(namespace); } } @@ -158,3 +158,10 @@ impl Simulator { } } } + +// TODO(IGGY-66): Add acceptance test for per-partition consensus independence. +// Setup: 3-replica simulator, two partitions (ns_a, ns_b). +// 1. Fill ns_a's pipeline to PIPELINE_PREPARE_QUEUE_MAX without delivering acks. +// 2. Send a request to ns_b, step until ns_b reply arrives. +// 3. Assert ns_b committed while ns_a pipeline is still full. +// Requires namespace-aware stepping (filter bus by namespace) or two-phase delivery. diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index ae4631e95..aead93f78 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -21,7 +21,7 @@ use crate::deps::{ }; use consensus::{LocalPipeline, VsrConsensus}; use iggy_common::IggyByteSize; -use iggy_common::sharding::ShardId; +use iggy_common::sharding::{IggyNamespace, ShardId}; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; @@ -29,9 +29,13 @@ use metadata::{IggyMetadata, variadic}; use partitions::PartitionsConfig; use std::sync::Arc; +// TODO: Make configurable +const CLUSTER_ID: u128 = 1; + pub struct Replica { pub id: u8, pub name: String, + pub replica_count: u8, pub metadata: SimMetadata, pub partitions: ReplicaPartitions, pub bus: Arc<MemBus>, @@ -44,48 +48,30 @@ impl Replica { let consumer_groups: ConsumerGroups = ConsumerGroupsInner::new().into(); let mux = SimMuxStateMachine::new(variadic!(users, streams, consumer_groups)); - let cluster_id: u128 = 1; // TODO: Make configurable + // Metadata uses namespace=0 (not partition-scoped) let metadata_consensus = VsrConsensus::new( - cluster_id, + CLUSTER_ID, id, replica_count, SharedMemBus(Arc::clone(&bus)), LocalPipeline::new(), + 0, ); metadata_consensus.init(); - // Create separate consensus instance for partitions - let partitions_consensus = VsrConsensus::new( - cluster_id, - id, - replica_count, - SharedMemBus(Arc::clone(&bus)), - LocalPipeline::new(), - ); - partitions_consensus.init(); - - // Configure partitions let partitions_config = PartitionsConfig { messages_required_to_save: 1000, size_of_messages_required_to_save: IggyByteSize::from(4 * 1024 * 1024), - enforce_fsync: false, // Disable fsync for simulation - segment_size: IggyByteSize::from(1024 * 1024 * 1024), // 1GB segments + enforce_fsync: false, + segment_size: IggyByteSize::from(1024 * 1024 * 1024), }; - // Only replica 0 gets consensus (primary shard for now) - let partitions = if id == 0 { - ReplicaPartitions::new( - ShardId::new(id as u16), - partitions_config, - Some(partitions_consensus), - ) - } else { - ReplicaPartitions::new(ShardId::new(id as u16), partitions_config, None) - }; + let partitions = ReplicaPartitions::new(ShardId::new(id as u16), partitions_config); Self { id, name, + replica_count, metadata: IggyMetadata { consensus: Some(metadata_consensus), journal: Some(SimJournal::<MemStorage>::default()), @@ -96,4 +82,36 @@ impl Replica { bus, } } + + pub fn init_partition(&mut self, namespace: IggyNamespace) { + if self.partitions.local_idx(&namespace).is_some() { + return; + } + let consensus = Self::create_partition_consensus( + self.id, + self.replica_count, + &self.bus, + namespace.inner(), + ); + self.partitions + .init_partition_in_memory(namespace, Some(consensus)); + } + + fn create_partition_consensus( + id: u8, + replica_count: u8, + bus: &Arc<MemBus>, + namespace: u64, + ) -> VsrConsensus<SharedMemBus> { + let consensus = VsrConsensus::new( + CLUSTER_ID, + id, + replica_count, + SharedMemBus(Arc::clone(bus)), + LocalPipeline::new(), + namespace, + ); + consensus.init(); + consensus + } }
