This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new be23a351a feat(consensus): add loopback queue for primary 
self-addressed messages (#2825)
be23a351a is described below

commit be23a351abbb77b323278370803f4d1bde75cf21
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 3 10:56:23 2026 +0100

    feat(consensus): add loopback queue for primary self-addressed messages 
(#2825)
---
 Cargo.lock                          |   1 +
 core/consensus/Cargo.toml           |   3 +
 core/consensus/src/impls.rs         |  76 ++++++++---
 core/consensus/src/plane_helpers.rs | 244 +++++++++++++++++++++++++++++++++---
 core/shard/src/lib.rs               |  51 ++++++++
 core/simulator/src/lib.rs           |   8 ++
 6 files changed, 350 insertions(+), 33 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e020afd0f..bbd78bdf8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2165,6 +2165,7 @@ name = "consensus"
 version = "0.1.0"
 dependencies = [
  "bit-set",
+ "futures",
  "iggy_common",
  "message_bus",
  "rand 0.10.0",
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 8c2dd9923..7b433bfb2 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -33,3 +33,6 @@ iggy_common = { workspace = true }
 message_bus = { workspace = true }
 rand = { workspace = true }
 rand_xoshiro = { workspace = true }
+
+[dev-dependencies]
+futures = { workspace = true }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 500a522c7..e1e992d8a 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -22,8 +22,8 @@ use crate::{
 };
 use bit_set::BitSet;
 use iggy_common::header::{
-    Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, 
PrepareOkHeader, RequestHeader,
-    StartViewChangeHeader, StartViewHeader,
+    Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, 
PrepareHeader, PrepareOkHeader,
+    RequestHeader, StartViewChangeHeader, StartViewHeader,
 };
 use iggy_common::message::Message;
 use message_bus::IggyMessageBus;
@@ -435,7 +435,7 @@ where
     pipeline: RefCell<P>,
 
     message_bus: B,
-    // TODO: Add loopback_queue for messages to self
+    loopback_queue: RefCell<VecDeque<Message<GenericHeader>>>,
     /// Tracks start view change messages received from all replicas 
(including self)
     start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
 
@@ -484,6 +484,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
             last_prepare_checksum: Cell::new(0),
             pipeline: RefCell::new(pipeline),
             message_bus,
+            loopback_queue: 
RefCell::new(VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX)),
             start_view_change_from_all_replicas: 
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
             do_view_change_from_all_replicas: 
RefCell::new(dvc_quorum_array_empty()),
             do_view_change_quorum: Cell::new(false),
@@ -621,12 +622,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         self.do_view_change_quorum.set(false);
     }
 
-    /// Reset all view change state for a new view.
-    fn reset_view_change_state(&self) {
+    /// Reset all view change state when transitioning to a new view.
+    ///
+    /// Clears the loopback queue: stale PrepareOks from the old view
+    /// reference pipeline entries that no longer exist, so processing
+    /// them would be a no-op (handle_prepare_ok ignores unknown ops).
+    /// The primary does not require its own self-ack for quorum.
+    pub(crate) fn reset_view_change_state(&self) {
         self.reset_svc_quorum();
         self.reset_dvc_quorum();
         self.sent_own_start_view_change.set(false);
         self.sent_own_do_view_change.set(false);
+        self.loopback_queue.borrow_mut().clear();
     }
 
     /// Process one tick. Call this periodically (e.g., every 10ms).
@@ -1061,6 +1068,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         // Stale pipeline entries from the old view are invalid in the new 
view.
         // Log reconciliation replays from the journal, not the pipeline.
         self.pipeline.borrow_mut().clear();
+        // Stale PrepareOk messages from the old view must not leak into the 
new view.
+        // `reset_view_change_state` handles this for view-number advances 
(SVC/DVC/SV),
+        // but this path fires within the current view after DVC quorum -- so 
we clear
+        // the loopback queue directly.
+        self.loopback_queue.borrow_mut().clear();
 
         // Update timeouts for normal primary operation
         {
@@ -1079,12 +1091,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         }]
     }
 
-    /// Handle a prepare_ok message from a follower.
-    /// Called on the primary when a follower acknowledges a prepare.
+    /// Handle a PrepareOk message from a replica.
     ///
-    /// Returns true if quorum was just reached for this op.
-    /// Handle a PrepareOk message. Returns true if quorum was reached.
-    /// Note: Caller (on_ack) should validate is_primary and status before 
calling.
+    /// Returns `true` if quorum was just reached for this op.
+    /// Caller (`on_ack`) should validate `is_primary` and status before 
calling.
     pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
         assert_eq!(header.command, Command2::PrepareOk);
         assert!(
@@ -1139,6 +1149,42 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> 
VsrConsensus<B, P> {
         false
     }
 
+    /// Enqueue a self-addressed message for processing in the next loopback 
drain.
+    ///
+    /// Currently only PrepareOk messages are routed here (via 
`send_or_loopback`).
+    // TODO: Route SVC/DVC self-messages through loopback once VsrAction 
dispatch is implemented.
+    pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
+        assert!(
+            self.loopback_queue.borrow().len() < PIPELINE_PREPARE_QUEUE_MAX,
+            "loopback queue overflow: {} items",
+            self.loopback_queue.borrow().len()
+        );
+        self.loopback_queue.borrow_mut().push_back(message);
+    }
+
+    /// Drain all pending loopback messages into `buf`, leaving the queue 
empty.
+    ///
+    /// The caller must dispatch each drained message to the appropriate 
handler.
+    pub fn drain_loopback_into(&self, buf: &mut Vec<Message<GenericHeader>>) {
+        buf.extend(self.loopback_queue.borrow_mut().drain(..));
+    }
+
+    /// Send a message to `target`, routing self-addressed messages through 
the loopback queue.
+    pub(crate) async fn send_or_loopback(&self, target: u8, message: 
Message<GenericHeader>)
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+    {
+        if target == self.replica {
+            self.push_loopback(message);
+        } else {
+            // TODO: Propagate send errors instead of panicking; requires bus 
error design.
+            self.message_bus
+                .send_to_replica(target, message)
+                .await
+                .unwrap();
+        }
+    }
+
     pub fn message_bus(&self) -> &B {
         &self.message_bus
     }
@@ -1222,12 +1268,10 @@ where
     type Sequencer = LocalSequencer;
     type Pipeline = P;
 
-    // TODO(hubcio): maybe we could record the primary's own ack here
-    // (entry.add_ack(self.replica)) instead of round-tripping through
-    // the message bus via send_prepare_ok.
-    // This avoids serialization/queuing overhead and would also allow
-    // reordering to WAL-first (on_replicate before pipeline_message)
-    // without risking lost self-acks from dispatch timing.
+    // The primary's self-ack is delivered via the loopback queue
+    // (push_loopback / drain_loopback_into) rather than inline here,
+    // so that WAL persistence can happen between pipeline insertion
+    // and ack recording.
     fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
         assert!(self.is_primary(), "only primary can pipeline messages");
 
diff --git a/core/consensus/src/plane_helpers.rs 
b/core/consensus/src/plane_helpers.rs
index cc4ec3e25..8f8ea1275 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -296,25 +296,11 @@ pub async fn send_prepare_ok<B, P>(
     let message: Message<PrepareOkHeader> =
         Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
             .transmute_header(|_, new| *new = prepare_ok_header);
-    let generic_message = message.into_generic();
     let primary = consensus.primary_index(consensus.view());
 
-    // TODO: Propagate send errors instead of panicking; requires bus error 
design.
-    if primary == consensus.replica() {
-        // TODO: Queue for self-processing or call handle_prepare_ok directly.
-        // TODO: This is temporal, to test simulator, but we should send 
message to ourselves properly.
-        consensus
-            .message_bus()
-            .send_to_replica(primary, generic_message)
-            .await
-            .unwrap();
-    } else {
-        consensus
-            .message_bus()
-            .send_to_replica(primary, generic_message)
-            .await
-            .unwrap();
-    }
+    consensus
+        .send_or_loopback(primary, message.into_generic())
+        .await;
 }
 
 #[cfg(test)]
@@ -379,6 +365,230 @@ mod tests {
         )
     }
 
+    #[test]
+    fn loopback_push_and_drain() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(buf.is_empty());
+
+        let msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+        consensus.push_loopback(msg.into_generic());
+        consensus.drain_loopback_into(&mut buf);
+        assert_eq!(buf.len(), 1);
+        buf.clear();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn loopback_cleared_on_view_change_reset() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        let msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+        consensus.push_loopback(msg.into_generic());
+        consensus.reset_view_change_state();
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    #[test]
+    fn send_prepare_ok_pushes_to_loopback_when_primary() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        let prepare_header = PrepareHeader {
+            command: Command2::Prepare,
+            cluster: 1,
+            view: 0,
+            op: 0,
+            checksum: 42,
+            ..Default::default()
+        };
+
+        futures::executor::block_on(send_prepare_ok(&consensus, 
&prepare_header, Some(true)));
+
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert_eq!(buf.len(), 1);
+        assert_eq!(buf[0].header().command, Command2::PrepareOk);
+
+        let typed: Message<PrepareOkHeader> = buf
+            .remove(0)
+            .try_into_typed()
+            .expect("loopback message must be PrepareOk");
+        assert_eq!(typed.header().command, Command2::PrepareOk);
+    }
+
+    #[test]
+    fn loopback_cleared_on_complete_view_change_as_primary() {
+        use iggy_common::header::{DoViewChangeHeader, StartViewChangeHeader};
+
+        // 3 replicas, replica 0 is primary for view 0 (and view 3: 3 % 3 = 0).
+        let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        // SVC from replica 1 for view 3.
+        // Replica 0 advances to view 3 (reset_view_change_state clears 
loopback),
+        // records own SVC + DVC, and records replica 1's SVC. DVC quorum 
needs 2, have 1.
+        let svc = StartViewChangeHeader {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 3,
+            release: 0,
+            command: Command2::StartViewChange,
+            replica: 1,
+            reserved_frame: [0; 66],
+            namespace: 0,
+            reserved: [0; 120],
+        };
+        let _ = consensus.handle_start_view_change(&svc);
+
+        // Simulate an in-flight loopback message queued between SVC and DVC 
quorum.
+        let stale_msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+        consensus.push_loopback(stale_msg.into_generic());
+
+        // DVC from replica 2 for view 3 -- quorum reached, 
complete_view_change_as_primary fires.
+        let dvc = DoViewChangeHeader {
+            checksum: 0,
+            checksum_body: 0,
+            cluster: 0,
+            size: 0,
+            view: 3,
+            release: 0,
+            command: Command2::DoViewChange,
+            replica: 2,
+            reserved_frame: [0; 66],
+            op: 0,
+            commit: 0,
+            namespace: 0,
+            log_view: 0,
+            reserved: [0; 100],
+        };
+        let actions = consensus.handle_do_view_change(&dvc);
+
+        // View change completed: should have SendStartView action.
+        assert!(
+            actions
+                .iter()
+                .any(|a| matches!(a, crate::VsrAction::SendStartView { .. })),
+            "expected SendStartView action after DVC quorum"
+        );
+
+        // The stale loopback message must have been cleared.
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(
+            buf.is_empty(),
+            "loopback queue must be empty after view change completion"
+        );
+    }
+
+    #[test]
+    fn send_prepare_ok_sends_to_bus_when_not_primary() {
+        let consensus = VsrConsensus::new(1, 1, 3, 0, NoopBus, 
LocalPipeline::new());
+        consensus.init();
+
+        let prepare_header = PrepareHeader {
+            command: Command2::Prepare,
+            cluster: 1,
+            view: 0,
+            op: 0,
+            checksum: 42,
+            ..Default::default()
+        };
+
+        futures::executor::block_on(send_prepare_ok(&consensus, 
&prepare_header, Some(true)));
+
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(buf.is_empty());
+    }
+
+    struct SpyBus {
+        sent: std::cell::RefCell<Vec<(u8, Message<GenericHeader>)>>,
+    }
+
+    impl SpyBus {
+        fn new() -> Self {
+            Self {
+                sent: std::cell::RefCell::new(Vec::new()),
+            }
+        }
+    }
+
+    impl MessageBus for SpyBus {
+        type Client = u128;
+        type Replica = u8;
+        type Data = Message<GenericHeader>;
+        type Sender = ();
+
+        fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender) 
-> bool {
+            true
+        }
+        fn remove_client(&mut self, _client: Self::Client) -> bool {
+            true
+        }
+        fn add_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+        fn remove_replica(&mut self, _replica: Self::Replica) -> bool {
+            true
+        }
+        async fn send_to_client(
+            &self,
+            _client_id: Self::Client,
+            _data: Self::Data,
+        ) -> Result<(), IggyError> {
+            Ok(())
+        }
+        async fn send_to_replica(
+            &self,
+            replica: Self::Replica,
+            data: Self::Data,
+        ) -> Result<(), IggyError> {
+            self.sent.borrow_mut().push((replica, data));
+            Ok(())
+        }
+    }
+
+    #[test]
+    fn send_or_loopback_routes_self_to_queue() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, SpyBus::new(), 
LocalPipeline::new());
+        consensus.init();
+
+        let msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+        futures::executor::block_on(consensus.send_or_loopback(0, 
msg.into_generic()));
+
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert_eq!(buf.len(), 1);
+        assert!(consensus.message_bus().sent.borrow().is_empty());
+    }
+
+    #[test]
+    fn send_or_loopback_routes_other_to_bus() {
+        let consensus = VsrConsensus::new(1, 0, 3, 0, SpyBus::new(), 
LocalPipeline::new());
+        consensus.init();
+
+        let msg = 
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+        futures::executor::block_on(consensus.send_or_loopback(1, 
msg.into_generic()));
+
+        let mut buf = Vec::new();
+        consensus.drain_loopback_into(&mut buf);
+        assert!(buf.is_empty());
+
+        let sent = consensus.message_bus().sent.borrow();
+        assert_eq!(sent.len(), 1);
+        assert_eq!(sent[0].0, 1);
+    }
+
     #[test]
     fn drains_head_prefix_by_commit_frontier() {
         let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus, 
LocalPipeline::new());
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 0b41b1d84..b51286d3f 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -137,6 +137,57 @@ where
         }
     }
 
+    /// Drain and dispatch loopback messages for each consensus plane.
+    ///
+    /// Each plane's loopback is dispatched directly to that plane's `on_ack`,
+    /// avoiding a flat merge that would require re-routing through 
`on_message`.
+    ///
+    /// Invariant: planes do not produce loopback messages for each other.
+    /// `on_ack` commits and applies but never calls `push_loopback`, so
+    /// draining metadata before partitions is order-independent.
+    pub async fn process_loopback(&self, buf: &mut 
Vec<Message<GenericHeader>>) -> usize
+    where
+        B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = 
u128>,
+        J: JournalHandle,
+        <J as JournalHandle>::Target: Journal<
+                <J as JournalHandle>::Storage,
+                Entry = Message<PrepareHeader>,
+                Header = PrepareHeader,
+            >,
+        M: StateMachine<Input = Message<PrepareHeader>>,
+    {
+        debug_assert!(buf.is_empty(), "buf must be empty on entry");
+
+        let mut total = 0;
+        let planes = self.plane.inner();
+
+        if let Some(ref consensus) = planes.0.consensus {
+            consensus.drain_loopback_into(buf);
+            let count = buf.len();
+            total += count;
+            for msg in buf.drain(..) {
+                let typed: Message<PrepareOkHeader> = msg
+                    .try_into_typed()
+                    .expect("loopback queue must only contain PrepareOk 
messages");
+                planes.0.on_ack(typed).await;
+            }
+        }
+
+        if let Some(consensus) = planes.1.0.consensus() {
+            consensus.drain_loopback_into(buf);
+            let count = buf.len();
+            total += count;
+            for msg in buf.drain(..) {
+                let typed: Message<PrepareOkHeader> = msg
+                    .try_into_typed()
+                    .expect("loopback queue must only contain PrepareOk 
messages");
+                planes.1.0.on_ack(typed).await;
+            }
+        }
+
+        total
+    }
+
     pub fn init_partition(&mut self, namespace: IggyNamespace)
     where
         B: MessageBus<
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index de27135f3..260b76488 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -122,6 +122,14 @@ impl Simulator {
         message: Message<iggy_common::header::GenericHeader>,
     ) {
         replica.on_message(message).await;
+
+        let mut buf = Vec::new();
+        replica.process_loopback(&mut buf).await;
+        debug_assert_eq!(
+            replica.process_loopback(&mut buf).await,
+            0,
+            "on_ack must not re-enqueue loopback messages"
+        );
     }
 }
 

Reply via email to