This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch iggy-67-62-pipeline in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 728c9230c512ab8a1919d4abe120fcf9f7078f24 Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Feb 11 13:46:07 2026 +0100 refactor(consensus): decouple pipeline from VsrConsensus internals VsrConsensus hardcoded LocalPipeline and PipelineEntry stored full Message<PrepareHeader> (header + body), even though only the header is needed for quorum tracking. This coupled the consensus pipeline to a specific implementation and wasted memory on message bodies that already live in the journal. Parameterize VsrConsensus<B, P = LocalPipeline> over the Pipeline trait so implementations can be swapped (IGGY-66). Slim PipelineEntry to store only PrepareHeader; at commit time on_ack fetches the full message from the journal. --- core/consensus/src/impls.rs | 89 ++++++++++++++++++++++--------------- core/metadata/src/impls/metadata.rs | 59 ++++++++++++++---------- core/simulator/src/deps.rs | 3 ++ core/simulator/src/replica.rs | 3 +- 4 files changed, 95 insertions(+), 59 deletions(-) diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index 36b595163..6d7d21208 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -86,7 +86,7 @@ pub const REPLICAS_MAX: usize = 32; #[derive(Debug)] pub struct PipelineEntry { - pub message: Message<PrepareHeader>, + pub header: PrepareHeader, /// Bitmap of replicas that have acknowledged this prepare. pub ok_from_replicas: BitSet<u32>, /// Whether we've received a quorum of prepare_ok messages. @@ -94,9 +94,9 @@ pub struct PipelineEntry { } impl PipelineEntry { - pub fn new(message: Message<PrepareHeader>) -> Self { + pub fn new(header: PrepareHeader) -> Self { Self { - message, + header, ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX), ok_quorum_received: false, } @@ -180,11 +180,10 @@ impl LocalPipeline { pub fn push_message(&mut self, message: Message<PrepareHeader>) { assert!(!self.prepare_queue_full(), "prepare queue is full"); - let header = message.header(); + let header = *message.header(); - // Verify hash chain if there's a previous entry if let Some(tail) = self.prepare_queue.back() { - let tail_header = tail.message.header(); + let tail_header = &tail.header; assert_eq!( header.op, tail_header.op + 1, @@ -199,7 +198,7 @@ impl LocalPipeline { assert!(header.view >= tail_header.view, "view cannot go backwards"); } - self.prepare_queue.push_back(PipelineEntry::new(message)); + self.prepare_queue.push_back(PipelineEntry::new(header)); } /// Pop the oldest message (after it's been committed). @@ -224,8 +223,8 @@ impl LocalPipeline { /// Find a message by op number and checksum (immutable). pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> Option<&PipelineEntry> { - let head_op = self.prepare_queue.front()?.message.header().op; - let tail_op = self.prepare_queue.back()?.message.header().op; + let head_op = self.prepare_queue.front()?.header.op; + let tail_op = self.prepare_queue.back()?.header.op; // Verify consecutive ops invariant debug_assert_eq!( @@ -241,9 +240,9 @@ impl LocalPipeline { let index = (op - head_op) as usize; let entry = self.prepare_queue.get(index)?; - debug_assert_eq!(entry.message.header().op, op); + debug_assert_eq!(entry.header.op, op); - if entry.message.header().checksum == checksum { + if entry.header.checksum == checksum { Some(entry) } else { None @@ -252,7 +251,7 @@ impl LocalPipeline { /// Find a message by op number only. pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> { - let head_op = self.prepare_queue.front()?.message.header().op; + let head_op = self.prepare_queue.front()?.header.op; if op < head_op { return None; @@ -265,7 +264,7 @@ impl LocalPipeline { /// Get mutable reference to a message entry by op number. /// Returns None if op is not in the pipeline. pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> { - let head_op = self.prepare_queue.front()?.message.header().op; + let head_op = self.prepare_queue.front()?.header.op; if op < head_op { return None; } @@ -286,9 +285,7 @@ impl LocalPipeline { /// If there are multiple messages (possible in prepare_queue after view change), /// returns the latest one. pub fn has_message_from_client(&self, client: u128) -> bool { - self.prepare_queue - .iter() - .any(|p| p.message.header().client == client) + self.prepare_queue.iter().any(|p| p.header.client == client) } /// Verify pipeline invariants. @@ -301,11 +298,11 @@ impl LocalPipeline { // Verify prepare queue hash chain if let Some(head) = self.prepare_queue.front() { - let mut expected_op = head.message.header().op; - let mut expected_parent = head.message.header().parent; + let mut expected_op = head.header.op; + let mut expected_parent = head.header.parent; for entry in &self.prepare_queue { - let header = entry.message.header(); + let header = &entry.header; assert_eq!(header.op, expected_op, "ops must be sequential"); assert_eq!(header.parent, expected_parent, "must be hash-chained"); @@ -324,7 +321,7 @@ impl LocalPipeline { /// Extract and remove a message by op number. /// Returns None if op is not in the pipeline. pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> { - let head_op = self.prepare_queue.front()?.message.header().op; + let head_op = self.prepare_queue.front()?.header.op; if op < head_op { return None; } @@ -409,9 +406,10 @@ pub enum VsrAction { #[allow(unused)] #[derive(Debug)] -pub struct VsrConsensus<B = IggyMessageBus> +pub struct VsrConsensus<B = IggyMessageBus, P = LocalPipeline> where B: MessageBus, + P: Pipeline, { cluster: u128, replica: u8, @@ -436,7 +434,7 @@ where last_timestamp: Cell<u64>, last_prepare_checksum: Cell<u128>, - pipeline: RefCell<LocalPipeline>, + pipeline: RefCell<P>, message_bus: B, // TODO: Add loopback_queue for messages to self @@ -456,8 +454,8 @@ where timeouts: RefCell<TimeoutManager>, } -impl<B: MessageBus> VsrConsensus<B> { - pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B) -> Self { +impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> { + pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, pipeline: P) -> Self { assert!( replica < replica_count, "replica index must be < replica_count" @@ -474,7 +472,7 @@ impl<B: MessageBus> VsrConsensus<B> { commit: Cell::new(0), last_timestamp: Cell::new(0), last_prepare_checksum: Cell::new(0), - pipeline: RefCell::new(LocalPipeline::new()), + pipeline: RefCell::new(pipeline), message_bus, start_view_change_from_all_replicas: RefCell::new(BitSet::with_capacity(REPLICAS_MAX)), do_view_change_from_all_replicas: RefCell::new(dvc_quorum_array_empty()), @@ -546,11 +544,14 @@ impl<B: MessageBus> VsrConsensus<B> { self.status.get() } - pub fn pipeline(&self) -> &RefCell<LocalPipeline> { + // TODO(hubcio): returning &RefCell<P> leaks interior mutability - callers + // could hold a Ref/RefMut across an .await and cause a runtime panic. + // We had this problem with slab + ECS. + pub fn pipeline(&self) -> &RefCell<P> { &self.pipeline } - pub fn pipeline_mut(&mut self) -> &mut RefCell<LocalPipeline> { + pub fn pipeline_mut(&mut self) -> &mut RefCell<P> { &mut self.pipeline } @@ -1060,7 +1061,7 @@ impl<B: MessageBus> VsrConsensus<B> { }; // Verify checksum matches - if entry.message.header().checksum != header.prepare_checksum { + if entry.header.checksum != header.prepare_checksum { return false; } @@ -1087,8 +1088,12 @@ impl<B: MessageBus> VsrConsensus<B> { } } -impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for Message<RequestHeader> { - fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> { +impl<B, P> Project<Message<PrepareHeader>, VsrConsensus<B, P>> for Message<RequestHeader> +where + B: MessageBus, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> { let op = consensus.sequencer.current_sequence() + 1; self.transmute_header(|old, new| { @@ -1114,8 +1119,12 @@ impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for Message } } -impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for Message<PrepareHeader> { - fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> { +impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for Message<PrepareHeader> +where + B: MessageBus, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ + fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareOkHeader> { self.transmute_header(|old, new| { *new = PrepareOkHeader { command: Command2::PrepareOk, @@ -1138,15 +1147,25 @@ impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for Messa } } -impl<B: MessageBus> Consensus for VsrConsensus<B> { +impl<B, P> Consensus for VsrConsensus<B, P> +where + B: MessageBus, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, +{ type MessageBus = B; type RequestMessage = Message<RequestHeader>; type ReplicateMessage = Message<PrepareHeader>; type AckMessage = Message<PrepareOkHeader>; type Sequencer = LocalSequencer; - type Pipeline = LocalPipeline; - + type Pipeline = P; + + // TODO(hubcio): maybe we could record the primary's own ack here + // (entry.add_ack(self.replica)) instead of round-tripping through + // the message bus via send_prepare_ok. + // This avoids serialization/queuing overhead and would also allow + // reordering to WAL-first (on_replicate before pipeline_message) + // without risking lost self-acks from dispatch timing. fn pipeline_message(&self, message: Self::ReplicateMessage) { assert!(self.is_primary(), "only primary can pipeline messages"); diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index c4e4b3da3..9e61fa57d 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. use crate::stm::StateMachine; -use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus}; +use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer, Status, VsrConsensus}; use iggy_common::{ header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, ReplyHeader}, message::Message, @@ -50,18 +50,19 @@ pub struct IggyMetadata<C, J, S, M> { pub mux_stm: M, } -impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, J, S, M> +impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for IggyMetadata<VsrConsensus<B, P>, J, S, M> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, J::Target: Journal< J::Storage, - Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage, + Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, Header = PrepareHeader, >, M: StateMachine<Input = Message<PrepareHeader>>, { - async fn on_request(&self, message: <VsrConsensus<B> as Consensus>::RequestMessage) { + async fn on_request(&self, message: <VsrConsensus<B, P> as Consensus>::RequestMessage) { let consensus = self.consensus.as_ref().unwrap(); // TODO: Bunch of asserts. @@ -70,7 +71,7 @@ where self.pipeline_prepare(prepare).await; } - async fn on_replicate(&self, message: <VsrConsensus<B> as Consensus>::ReplicateMessage) { + async fn on_replicate(&self, message: <VsrConsensus<B, P> as Consensus>::ReplicateMessage) { let consensus = self.consensus.as_ref().unwrap(); let journal = self.journal.as_ref().unwrap(); @@ -145,7 +146,7 @@ where } } - async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::AckMessage) { + async fn on_ack(&self, message: <VsrConsensus<B, P> as Consensus>::AckMessage) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -159,28 +160,25 @@ where return; } - // Verify checksum by checking pipeline entry exists { let pipeline = consensus.pipeline().borrow(); - let Some(entry) = - pipeline.message_by_op_and_checksum(header.op, header.prepare_checksum) - else { + if pipeline + .message_by_op_and_checksum(header.op, header.prepare_checksum) + .is_none() + { debug!("on_ack: prepare not in pipeline op={}", header.op); return; - }; - - if entry.message.header().checksum != header.prepare_checksum { - warn!("on_ack: checksum mismatch"); - return; } } // Let consensus handle the ack increment and quorum check if consensus.handle_prepare_ok(header) { + let journal = self.journal.as_ref().unwrap(); + debug!("on_ack: quorum received for op={}", header.op); consensus.advance_commit_number(header.op); - // Extract the prepare message from the pipeline by op + // Extract the header from the pipeline, fetch the full message from journal // TODO: Commit from the head. ALWAYS let entry = consensus.pipeline().borrow_mut().extract_by_op(header.op); let Some(entry) = entry else { @@ -188,11 +186,21 @@ where return; }; - let prepare = entry.message; - let prepare_header = *prepare.header(); + let prepare_header = entry.header; + // TODO(hubcio): should we replace this with graceful fallback (warn + return)? + // When journal compaction is implemented compaction could race + // with this lookup if it removes entries below the commit number. + let prepare = journal + .handle() + .entry(&prepare_header) + .await + .unwrap_or_else(|| { + panic!( + "on_ack: committed prepare op={} checksum={} must be in journal", + prepare_header.op, prepare_header.checksum + ) + }); - // Apply the state (consumes prepare) - // TODO: Handle appending result to response let _result = self.mux_stm.update(prepare); debug!("on_ack: state applied for op={}", prepare_header.op); @@ -241,13 +249,14 @@ where } } -impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M> +impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M> where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, + P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>, J: JournalHandle, J::Target: Journal< J::Storage, - Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage, + Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage, Header = PrepareHeader, >, M: StateMachine<Input = Message<PrepareHeader>>, @@ -257,9 +266,13 @@ where debug!("inserting prepare into metadata pipeline"); consensus.verify_pipeline(); + // Pipeline-first ordering is safe only because message + // processing is cooperative (single-task, RefCell-based). + // If on_replicate ever early-returns (syncing, status change) + // the entry would be in the pipeline without journal backing. consensus.pipeline_message(prepare.clone()); - self.on_replicate(prepare.clone()).await; + consensus.post_replicate_verify(&prepare); } diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs index 3dbdac54f..2b0c1bb31 100644 --- a/core/simulator/src/deps.rs +++ b/core/simulator/src/deps.rs @@ -87,6 +87,9 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for SimJournal<S> { type Header = PrepareHeader; type Entry = Message<PrepareHeader>; + // TODO(hubcio): validate that the caller's checksum matches the stored + // header — currently this looks up by op only, ignoring the checksum. + // A real journal implementation must reject mismatches. async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> { let headers = unsafe { &*self.headers.get() }; let offsets = unsafe { &*self.offsets.get() }; diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs index 74c26b72f..36a19efb9 100644 --- a/core/simulator/src/replica.rs +++ b/core/simulator/src/replica.rs @@ -19,7 +19,7 @@ use crate::bus::{MemBus, SharedMemBus}; use crate::deps::{ MemStorage, ReplicaPartitions, SimJournal, SimMetadata, SimMuxStateMachine, SimSnapshot, }; -use consensus::VsrConsensus; +use consensus::{LocalPipeline, VsrConsensus}; use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner}; use metadata::stm::stream::{Streams, StreamsInner}; use metadata::stm::user::{Users, UsersInner}; @@ -48,6 +48,7 @@ impl Replica { id, replica_count, SharedMemBus(Arc::clone(&bus)), + LocalPipeline::new(), ); consensus.init();
