This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch iggy-67-62-pipeline
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 8528945235e15346a2efe84c88f958ce337597f0
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Feb 11 13:46:07 2026 +0100

    refactor(consensus): decouple pipeline from VsrConsensus internals
    
    VsrConsensus hardcoded LocalPipeline and PipelineEntry stored
    full Message<PrepareHeader> (header + body), even though only
    the header is needed for quorum tracking. This coupled the
    consensus pipeline to a specific implementation and wasted
    memory on message bodies that already live in the journal.
    
    Parameterize VsrConsensus<B, P = LocalPipeline> over the
    Pipeline trait so implementations can be swapped (IGGY-66).
    Slim PipelineEntry to store only PrepareHeader; at commit
    time on_ack fetches the full message from the journal.
---
 core/consensus/src/impls.rs         | 87 ++++++++++++++++++++++---------------
 core/metadata/src/impls/metadata.rs | 59 +++++++++++++++----------
 core/simulator/src/replica.rs       |  3 +-
 3 files changed, 89 insertions(+), 60 deletions(-)

diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 36b595163..989f7f334 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -86,7 +86,7 @@ pub const REPLICAS_MAX: usize = 32;
 
 #[derive(Debug)]
 pub struct PipelineEntry {
-    pub message: Message<PrepareHeader>,
+    pub header: PrepareHeader,
     /// Bitmap of replicas that have acknowledged this prepare.
     pub ok_from_replicas: BitSet<u32>,
     /// Whether we've received a quorum of prepare_ok messages.
@@ -94,9 +94,9 @@ pub struct PipelineEntry {
 }
 
 impl PipelineEntry {
-    pub fn new(message: Message<PrepareHeader>) -> Self {
+    pub fn new(header: PrepareHeader) -> Self {
         Self {
-            message,
+            header,
             ok_from_replicas: BitSet::with_capacity(REPLICAS_MAX),
             ok_quorum_received: false,
         }
@@ -180,11 +180,10 @@ impl LocalPipeline {
     pub fn push_message(&mut self, message: Message<PrepareHeader>) {
         assert!(!self.prepare_queue_full(), "prepare queue is full");
 
-        let header = message.header();
+        let header = *message.header();
 
-        // Verify hash chain if there's a previous entry
         if let Some(tail) = self.prepare_queue.back() {
-            let tail_header = tail.message.header();
+            let tail_header = &tail.header;
             assert_eq!(
                 header.op,
                 tail_header.op + 1,
@@ -199,7 +198,7 @@ impl LocalPipeline {
             assert!(header.view >= tail_header.view, "view cannot go 
backwards");
         }
 
-        self.prepare_queue.push_back(PipelineEntry::new(message));
+        self.prepare_queue.push_back(PipelineEntry::new(header));
     }
 
     /// Pop the oldest message (after it's been committed).
@@ -224,10 +223,9 @@ impl LocalPipeline {
 
     /// Find a message by op number and checksum (immutable).
     pub fn message_by_op_and_checksum(&self, op: u64, checksum: u128) -> 
Option<&PipelineEntry> {
-        let head_op = self.prepare_queue.front()?.message.header().op;
-        let tail_op = self.prepare_queue.back()?.message.header().op;
+        let head_op = self.prepare_queue.front()?.header.op;
+        let tail_op = self.prepare_queue.back()?.header.op;
 
-        // Verify consecutive ops invariant
         debug_assert_eq!(
             tail_op,
             head_op + self.prepare_queue.len() as u64 - 1,
@@ -241,9 +239,9 @@ impl LocalPipeline {
         let index = (op - head_op) as usize;
         let entry = self.prepare_queue.get(index)?;
 
-        debug_assert_eq!(entry.message.header().op, op);
+        debug_assert_eq!(entry.header.op, op);
 
-        if entry.message.header().checksum == checksum {
+        if entry.header.checksum == checksum {
             Some(entry)
         } else {
             None
@@ -252,7 +250,7 @@ impl LocalPipeline {
 
     /// Find a message by op number only.
     pub fn message_by_op(&self, op: u64) -> Option<&PipelineEntry> {
-        let head_op = self.prepare_queue.front()?.message.header().op;
+        let head_op = self.prepare_queue.front()?.header.op;
 
         if op < head_op {
             return None;
@@ -265,7 +263,7 @@ impl LocalPipeline {
     /// Get mutable reference to a message entry by op number.
     /// Returns None if op is not in the pipeline.
     pub fn message_by_op_mut(&mut self, op: u64) -> Option<&mut PipelineEntry> 
{
-        let head_op = self.prepare_queue.front()?.message.header().op;
+        let head_op = self.prepare_queue.front()?.header.op;
         if op < head_op {
             return None;
         }
@@ -286,9 +284,7 @@ impl LocalPipeline {
     /// If there are multiple messages (possible in prepare_queue after view 
change),
     /// returns the latest one.
     pub fn has_message_from_client(&self, client: u128) -> bool {
-        self.prepare_queue
-            .iter()
-            .any(|p| p.message.header().client == client)
+        self.prepare_queue.iter().any(|p| p.header.client == client)
     }
 
     /// Verify pipeline invariants.
@@ -301,11 +297,11 @@ impl LocalPipeline {
 
         // Verify prepare queue hash chain
         if let Some(head) = self.prepare_queue.front() {
-            let mut expected_op = head.message.header().op;
-            let mut expected_parent = head.message.header().parent;
+            let mut expected_op = head.header.op;
+            let mut expected_parent = head.header.parent;
 
             for entry in &self.prepare_queue {
-                let header = entry.message.header();
+                let header = &entry.header;
 
                 assert_eq!(header.op, expected_op, "ops must be sequential");
                 assert_eq!(header.parent, expected_parent, "must be 
hash-chained");
@@ -324,7 +320,7 @@ impl LocalPipeline {
     /// Extract and remove a message by op number.
     /// Returns None if op is not in the pipeline.
     pub fn extract_by_op(&mut self, op: u64) -> Option<PipelineEntry> {
-        let head_op = self.prepare_queue.front()?.message.header().op;
+        let head_op = self.prepare_queue.front()?.header.op;
         if op < head_op {
             return None;
         }
@@ -409,9 +405,10 @@ pub enum VsrAction {
 
 #[allow(unused)]
 #[derive(Debug)]
-pub struct VsrConsensus<B = IggyMessageBus>
+pub struct VsrConsensus<B = IggyMessageBus, P = LocalPipeline>
 where
     B: MessageBus,
+    P: Pipeline,
 {
     cluster: u128,
     replica: u8,
@@ -436,7 +433,7 @@ where
     last_timestamp: Cell<u64>,
     last_prepare_checksum: Cell<u128>,
 
-    pipeline: RefCell<LocalPipeline>,
+    pipeline: RefCell<P>,
 
     message_bus: B,
     // TODO: Add loopback_queue for messages to self
@@ -456,8 +453,8 @@ where
     timeouts: RefCell<TimeoutManager>,
 }
 
-impl<B: MessageBus> VsrConsensus<B> {
-    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B) 
-> Self {
+impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
+    pub fn new(cluster: u128, replica: u8, replica_count: u8, message_bus: B, 
pipeline: P) -> Self {
         assert!(
             replica < replica_count,
             "replica index must be < replica_count"
@@ -474,7 +471,7 @@ impl<B: MessageBus> VsrConsensus<B> {
             commit: Cell::new(0),
             last_timestamp: Cell::new(0),
             last_prepare_checksum: Cell::new(0),
-            pipeline: RefCell::new(LocalPipeline::new()),
+            pipeline: RefCell::new(pipeline),
             message_bus,
             start_view_change_from_all_replicas: 
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
             do_view_change_from_all_replicas: 
RefCell::new(dvc_quorum_array_empty()),
@@ -546,11 +543,11 @@ impl<B: MessageBus> VsrConsensus<B> {
         self.status.get()
     }
 
-    pub fn pipeline(&self) -> &RefCell<LocalPipeline> {
+    pub fn pipeline(&self) -> &RefCell<P> {
         &self.pipeline
     }
 
-    pub fn pipeline_mut(&mut self) -> &mut RefCell<LocalPipeline> {
+    pub fn pipeline_mut(&mut self) -> &mut RefCell<P> {
         &mut self.pipeline
     }
 
@@ -1060,7 +1057,7 @@ impl<B: MessageBus> VsrConsensus<B> {
         };
 
         // Verify checksum matches
-        if entry.message.header().checksum != header.prepare_checksum {
+        if entry.header.checksum != header.prepare_checksum {
             return false;
         }
 
@@ -1087,8 +1084,12 @@ impl<B: MessageBus> VsrConsensus<B> {
     }
 }
 
-impl<B: MessageBus> Project<Message<PrepareHeader>, VsrConsensus<B>> for 
Message<RequestHeader> {
-    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareHeader> {
+impl<B, P> Project<Message<PrepareHeader>, VsrConsensus<B, P>> for 
Message<RequestHeader>
+where
+    B: MessageBus,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    fn project(self, consensus: &VsrConsensus<B, P>) -> Message<PrepareHeader> 
{
         let op = consensus.sequencer.current_sequence() + 1;
 
         self.transmute_header(|old, new| {
@@ -1114,8 +1115,12 @@ impl<B: MessageBus> Project<Message<PrepareHeader>, 
VsrConsensus<B>> for Message
     }
 }
 
-impl<B: MessageBus> Project<Message<PrepareOkHeader>, VsrConsensus<B>> for 
Message<PrepareHeader> {
-    fn project(self, consensus: &VsrConsensus<B>) -> Message<PrepareOkHeader> {
+impl<B, P> Project<Message<PrepareOkHeader>, VsrConsensus<B, P>> for 
Message<PrepareHeader>
+where
+    B: MessageBus,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
+    fn project(self, consensus: &VsrConsensus<B, P>) -> 
Message<PrepareOkHeader> {
         self.transmute_header(|old, new| {
             *new = PrepareOkHeader {
                 command: Command2::PrepareOk,
@@ -1138,15 +1143,25 @@ impl<B: MessageBus> Project<Message<PrepareOkHeader>, 
VsrConsensus<B>> for Messa
     }
 }
 
-impl<B: MessageBus> Consensus for VsrConsensus<B> {
+impl<B, P> Consensus for VsrConsensus<B, P>
+where
+    B: MessageBus,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
+{
     type MessageBus = B;
 
     type RequestMessage = Message<RequestHeader>;
     type ReplicateMessage = Message<PrepareHeader>;
     type AckMessage = Message<PrepareOkHeader>;
     type Sequencer = LocalSequencer;
-    type Pipeline = LocalPipeline;
-
+    type Pipeline = P;
+
+    // TODO(hubcio): maybe we could record the primary's own ack here
+    // (entry.add_ack(self.replica)) instead of round-tripping through
+    // the message bus via send_prepare_ok.
+    // This avoids serialization/queuing overhead and would also allow
+    // reordering to WAL-first (on_replicate before pipeline_message)
+    // without risking lost self-acks from dispatch timing.
     fn pipeline_message(&self, message: Self::ReplicateMessage) {
         assert!(self.is_primary(), "only primary can pipeline messages");
 
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c4e4b3da3..9e61fa57d 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 use crate::stm::StateMachine;
-use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
+use consensus::{Consensus, Pipeline, PipelineEntry, Project, Sequencer, 
Status, VsrConsensus};
 use iggy_common::{
     header::{Command2, GenericHeader, PrepareHeader, PrepareOkHeader, 
ReplyHeader},
     message::Message,
@@ -50,18 +50,19 @@ pub struct IggyMetadata<C, J, S, M> {
     pub mux_stm: M,
 }
 
-impl<B, J, S, M> Metadata<VsrConsensus<B>> for IggyMetadata<VsrConsensus<B>, 
J, S, M>
+impl<B, P, J, S, M> Metadata<VsrConsensus<B, P>> for 
IggyMetadata<VsrConsensus<B, P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
     M: StateMachine<Input = Message<PrepareHeader>>,
 {
-    async fn on_request(&self, message: <VsrConsensus<B> as 
Consensus>::RequestMessage) {
+    async fn on_request(&self, message: <VsrConsensus<B, P> as 
Consensus>::RequestMessage) {
         let consensus = self.consensus.as_ref().unwrap();
 
         // TODO: Bunch of asserts.
@@ -70,7 +71,7 @@ where
         self.pipeline_prepare(prepare).await;
     }
 
-    async fn on_replicate(&self, message: <VsrConsensus<B> as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus<B, P> as 
Consensus>::ReplicateMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let journal = self.journal.as_ref().unwrap();
 
@@ -145,7 +146,7 @@ where
         }
     }
 
-    async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::AckMessage) {
+    async fn on_ack(&self, message: <VsrConsensus<B, P> as 
Consensus>::AckMessage) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
@@ -159,28 +160,25 @@ where
             return;
         }
 
-        // Verify checksum by checking pipeline entry exists
         {
             let pipeline = consensus.pipeline().borrow();
-            let Some(entry) =
-                pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
-            else {
+            if pipeline
+                .message_by_op_and_checksum(header.op, header.prepare_checksum)
+                .is_none()
+            {
                 debug!("on_ack: prepare not in pipeline op={}", header.op);
                 return;
-            };
-
-            if entry.message.header().checksum != header.prepare_checksum {
-                warn!("on_ack: checksum mismatch");
-                return;
             }
         }
 
         // Let consensus handle the ack increment and quorum check
         if consensus.handle_prepare_ok(header) {
+            let journal = self.journal.as_ref().unwrap();
+
             debug!("on_ack: quorum received for op={}", header.op);
             consensus.advance_commit_number(header.op);
 
-            // Extract the prepare message from the pipeline by op
+            // Extract the header from the pipeline, fetch the full message 
from journal
             // TODO: Commit from the head. ALWAYS
             let entry = 
consensus.pipeline().borrow_mut().extract_by_op(header.op);
             let Some(entry) = entry else {
@@ -188,11 +186,21 @@ where
                 return;
             };
 
-            let prepare = entry.message;
-            let prepare_header = *prepare.header();
+            let prepare_header = entry.header;
+            // TODO(hubcio): should we replace this with graceful fallback 
(warn + return)?
+            // When journal compaction is implemented compaction could race
+            // with this lookup if it removes entries below the commit number.
+            let prepare = journal
+                .handle()
+                .entry(&prepare_header)
+                .await
+                .unwrap_or_else(|| {
+                    panic!(
+                        "on_ack: committed prepare op={} checksum={} must be 
in journal",
+                        prepare_header.op, prepare_header.checksum
+                    )
+                });
 
-            // Apply the state (consumes prepare)
-            // TODO: Handle appending result to response
             let _result = self.mux_stm.update(prepare);
             debug!("on_ack: state applied for op={}", prepare_header.op);
 
@@ -241,13 +249,14 @@ where
     }
 }
 
-impl<B, J, S, M> IggyMetadata<VsrConsensus<B>, J, S, M>
+impl<B, P, J, S, M> IggyMetadata<VsrConsensus<B, P>, J, S, M>
 where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Message = Message<PrepareHeader>, Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<
             J::Storage,
-            Entry = <VsrConsensus<B> as Consensus>::ReplicateMessage,
+            Entry = <VsrConsensus<B, P> as Consensus>::ReplicateMessage,
             Header = PrepareHeader,
         >,
     M: StateMachine<Input = Message<PrepareHeader>>,
@@ -257,9 +266,13 @@ where
 
         debug!("inserting prepare into metadata pipeline");
         consensus.verify_pipeline();
+        // Pipeline-first ordering is safe only because message
+        // processing is cooperative (single-task, RefCell-based).
+        // If on_replicate ever early-returns (syncing, status change)
+        // the entry would be in the pipeline without journal backing.
         consensus.pipeline_message(prepare.clone());
-
         self.on_replicate(prepare.clone()).await;
+
         consensus.post_replicate_verify(&prepare);
     }
 
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 74c26b72f..36a19efb9 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -19,7 +19,7 @@ use crate::bus::{MemBus, SharedMemBus};
 use crate::deps::{
     MemStorage, ReplicaPartitions, SimJournal, SimMetadata, 
SimMuxStateMachine, SimSnapshot,
 };
-use consensus::VsrConsensus;
+use consensus::{LocalPipeline, VsrConsensus};
 use metadata::stm::consumer_group::{ConsumerGroups, ConsumerGroupsInner};
 use metadata::stm::stream::{Streams, StreamsInner};
 use metadata::stm::user::{Users, UsersInner};
@@ -48,6 +48,7 @@ impl Replica {
             id,
             replica_count,
             SharedMemBus(Arc::clone(&bus)),
+            LocalPipeline::new(),
         );
         consensus.init();
 

Reply via email to