This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new be23a351a feat(consensus): add loopback queue for primary
self-addressed messages (#2825)
be23a351a is described below
commit be23a351abbb77b323278370803f4d1bde75cf21
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 3 10:56:23 2026 +0100
feat(consensus): add loopback queue for primary self-addressed messages
(#2825)
---
Cargo.lock | 1 +
core/consensus/Cargo.toml | 3 +
core/consensus/src/impls.rs | 76 ++++++++---
core/consensus/src/plane_helpers.rs | 244 +++++++++++++++++++++++++++++++++---
core/shard/src/lib.rs | 51 ++++++++
core/simulator/src/lib.rs | 8 ++
6 files changed, 350 insertions(+), 33 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index e020afd0f..bbd78bdf8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2165,6 +2165,7 @@ name = "consensus"
version = "0.1.0"
dependencies = [
"bit-set",
+ "futures",
"iggy_common",
"message_bus",
"rand 0.10.0",
diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml
index 8c2dd9923..7b433bfb2 100644
--- a/core/consensus/Cargo.toml
+++ b/core/consensus/Cargo.toml
@@ -33,3 +33,6 @@ iggy_common = { workspace = true }
message_bus = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }
+
+[dev-dependencies]
+futures = { workspace = true }
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index 500a522c7..e1e992d8a 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -22,8 +22,8 @@ use crate::{
};
use bit_set::BitSet;
use iggy_common::header::{
- Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader,
PrepareOkHeader, RequestHeader,
- StartViewChangeHeader, StartViewHeader,
+ Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader,
PrepareHeader, PrepareOkHeader,
+ RequestHeader, StartViewChangeHeader, StartViewHeader,
};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
@@ -435,7 +435,7 @@ where
pipeline: RefCell<P>,
message_bus: B,
- // TODO: Add loopback_queue for messages to self
+ loopback_queue: RefCell<VecDeque<Message<GenericHeader>>>,
/// Tracks start view change messages received from all replicas
(including self)
start_view_change_from_all_replicas: RefCell<BitSet<u32>>,
@@ -484,6 +484,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(pipeline),
message_bus,
+ loopback_queue:
RefCell::new(VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX)),
start_view_change_from_all_replicas:
RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
do_view_change_from_all_replicas:
RefCell::new(dvc_quorum_array_empty()),
do_view_change_quorum: Cell::new(false),
@@ -621,12 +622,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
self.do_view_change_quorum.set(false);
}
- /// Reset all view change state for a new view.
- fn reset_view_change_state(&self) {
+ /// Reset all view change state when transitioning to a new view.
+ ///
+ /// Clears the loopback queue: stale PrepareOks from the old view
+ /// reference pipeline entries that no longer exist, so processing
+ /// them would be a no-op (handle_prepare_ok ignores unknown ops).
+ /// The primary does not require its own self-ack for quorum.
+ pub(crate) fn reset_view_change_state(&self) {
self.reset_svc_quorum();
self.reset_dvc_quorum();
self.sent_own_start_view_change.set(false);
self.sent_own_do_view_change.set(false);
+ self.loopback_queue.borrow_mut().clear();
}
/// Process one tick. Call this periodically (e.g., every 10ms).
@@ -1061,6 +1068,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
// Stale pipeline entries from the old view are invalid in the new
view.
// Log reconciliation replays from the journal, not the pipeline.
self.pipeline.borrow_mut().clear();
+ // Stale PrepareOk messages from the old view must not leak into the
new view.
+ // `reset_view_change_state` handles this for view-number advances
(SVC/DVC/SV),
+ // but this path fires within the current view after DVC quorum -- so
we clear
+ // the loopback queue directly.
+ self.loopback_queue.borrow_mut().clear();
// Update timeouts for normal primary operation
{
@@ -1079,12 +1091,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
}]
}
- /// Handle a prepare_ok message from a follower.
- /// Called on the primary when a follower acknowledges a prepare.
+ /// Handle a PrepareOk message from a replica.
///
- /// Returns true if quorum was just reached for this op.
- /// Handle a PrepareOk message. Returns true if quorum was reached.
- /// Note: Caller (on_ack) should validate is_primary and status before
calling.
+ /// Returns `true` if quorum was just reached for this op.
+ /// Caller (`on_ack`) should validate `is_primary` and status before
calling.
pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
assert_eq!(header.command, Command2::PrepareOk);
assert!(
@@ -1139,6 +1149,42 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>>
VsrConsensus<B, P> {
false
}
+ /// Enqueue a self-addressed message for processing in the next loopback
drain.
+ ///
+ /// Currently only PrepareOk messages are routed here (via
`send_or_loopback`).
+ // TODO: Route SVC/DVC self-messages through loopback once VsrAction
dispatch is implemented.
+ pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
+ assert!(
+ self.loopback_queue.borrow().len() < PIPELINE_PREPARE_QUEUE_MAX,
+ "loopback queue overflow: {} items",
+ self.loopback_queue.borrow().len()
+ );
+ self.loopback_queue.borrow_mut().push_back(message);
+ }
+
+ /// Drain all pending loopback messages into `buf`, leaving the queue
empty.
+ ///
+ /// The caller must dispatch each drained message to the appropriate
handler.
+ pub fn drain_loopback_into(&self, buf: &mut Vec<Message<GenericHeader>>) {
+ buf.extend(self.loopback_queue.borrow_mut().drain(..));
+ }
+
+ /// Send a message to `target`, routing self-addressed messages through
the loopback queue.
+ pub(crate) async fn send_or_loopback(&self, target: u8, message:
Message<GenericHeader>)
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ {
+ if target == self.replica {
+ self.push_loopback(message);
+ } else {
+ // TODO: Propagate send errors instead of panicking; requires bus
error design.
+ self.message_bus
+ .send_to_replica(target, message)
+ .await
+ .unwrap();
+ }
+ }
+
pub fn message_bus(&self) -> &B {
&self.message_bus
}
@@ -1222,12 +1268,10 @@ where
type Sequencer = LocalSequencer;
type Pipeline = P;
- // TODO(hubcio): maybe we could record the primary's own ack here
- // (entry.add_ack(self.replica)) instead of round-tripping through
- // the message bus via send_prepare_ok.
- // This avoids serialization/queuing overhead and would also allow
- // reordering to WAL-first (on_replicate before pipeline_message)
- // without risking lost self-acks from dispatch timing.
+ // The primary's self-ack is delivered via the loopback queue
+ // (push_loopback / drain_loopback_into) rather than inline here,
+ // so that WAL persistence can happen between pipeline insertion
+ // and ack recording.
fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
assert!(self.is_primary(), "only primary can pipeline messages");
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index cc4ec3e25..8f8ea1275 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -296,25 +296,11 @@ pub async fn send_prepare_ok<B, P>(
let message: Message<PrepareOkHeader> =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
.transmute_header(|_, new| *new = prepare_ok_header);
- let generic_message = message.into_generic();
let primary = consensus.primary_index(consensus.view());
- // TODO: Propagate send errors instead of panicking; requires bus error
design.
- if primary == consensus.replica() {
- // TODO: Queue for self-processing or call handle_prepare_ok directly.
- // TODO: This is temporal, to test simulator, but we should send
message to ourselves properly.
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- } else {
- consensus
- .message_bus()
- .send_to_replica(primary, generic_message)
- .await
- .unwrap();
- }
+ consensus
+ .send_or_loopback(primary, message.into_generic())
+ .await;
}
#[cfg(test)]
@@ -379,6 +365,230 @@ mod tests {
)
}
+ #[test]
+ fn loopback_push_and_drain() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(buf.is_empty());
+
+ let msg =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+ consensus.push_loopback(msg.into_generic());
+ consensus.drain_loopback_into(&mut buf);
+ assert_eq!(buf.len(), 1);
+ buf.clear();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn loopback_cleared_on_view_change_reset() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ let msg =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+ consensus.push_loopback(msg.into_generic());
+ consensus.reset_view_change_state();
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn send_prepare_ok_pushes_to_loopback_when_primary() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ let prepare_header = PrepareHeader {
+ command: Command2::Prepare,
+ cluster: 1,
+ view: 0,
+ op: 0,
+ checksum: 42,
+ ..Default::default()
+ };
+
+ futures::executor::block_on(send_prepare_ok(&consensus,
&prepare_header, Some(true)));
+
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert_eq!(buf.len(), 1);
+ assert_eq!(buf[0].header().command, Command2::PrepareOk);
+
+ let typed: Message<PrepareOkHeader> = buf
+ .remove(0)
+ .try_into_typed()
+ .expect("loopback message must be PrepareOk");
+ assert_eq!(typed.header().command, Command2::PrepareOk);
+ }
+
+ #[test]
+ fn loopback_cleared_on_complete_view_change_as_primary() {
+ use iggy_common::header::{DoViewChangeHeader, StartViewChangeHeader};
+
+ // 3 replicas, replica 0 is primary for view 0 (and view 3: 3 % 3 = 0).
+ let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ // SVC from replica 1 for view 3.
+ // Replica 0 advances to view 3 (reset_view_change_state clears
loopback),
+ // records own SVC + DVC, and records replica 1's SVC. DVC quorum
needs 2, have 1.
+ let svc = StartViewChangeHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 3,
+ release: 0,
+ command: Command2::StartViewChange,
+ replica: 1,
+ reserved_frame: [0; 66],
+ namespace: 0,
+ reserved: [0; 120],
+ };
+ let _ = consensus.handle_start_view_change(&svc);
+
+ // Simulate an in-flight loopback message queued between SVC and DVC
quorum.
+ let stale_msg =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+ consensus.push_loopback(stale_msg.into_generic());
+
+ // DVC from replica 2 for view 3 -- quorum reached,
complete_view_change_as_primary fires.
+ let dvc = DoViewChangeHeader {
+ checksum: 0,
+ checksum_body: 0,
+ cluster: 0,
+ size: 0,
+ view: 3,
+ release: 0,
+ command: Command2::DoViewChange,
+ replica: 2,
+ reserved_frame: [0; 66],
+ op: 0,
+ commit: 0,
+ namespace: 0,
+ log_view: 0,
+ reserved: [0; 100],
+ };
+ let actions = consensus.handle_do_view_change(&dvc);
+
+ // View change completed: should have SendStartView action.
+ assert!(
+ actions
+ .iter()
+ .any(|a| matches!(a, crate::VsrAction::SendStartView { .. })),
+ "expected SendStartView action after DVC quorum"
+ );
+
+ // The stale loopback message must have been cleared.
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(
+ buf.is_empty(),
+ "loopback queue must be empty after view change completion"
+ );
+ }
+
+ #[test]
+ fn send_prepare_ok_sends_to_bus_when_not_primary() {
+ let consensus = VsrConsensus::new(1, 1, 3, 0, NoopBus,
LocalPipeline::new());
+ consensus.init();
+
+ let prepare_header = PrepareHeader {
+ command: Command2::Prepare,
+ cluster: 1,
+ view: 0,
+ op: 0,
+ checksum: 42,
+ ..Default::default()
+ };
+
+ futures::executor::block_on(send_prepare_ok(&consensus,
&prepare_header, Some(true)));
+
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ struct SpyBus {
+ sent: std::cell::RefCell<Vec<(u8, Message<GenericHeader>)>>,
+ }
+
+ impl SpyBus {
+ fn new() -> Self {
+ Self {
+ sent: std::cell::RefCell::new(Vec::new()),
+ }
+ }
+ }
+
+ impl MessageBus for SpyBus {
+ type Client = u128;
+ type Replica = u8;
+ type Data = Message<GenericHeader>;
+ type Sender = ();
+
+ fn add_client(&mut self, _client: Self::Client, _sender: Self::Sender)
-> bool {
+ true
+ }
+ fn remove_client(&mut self, _client: Self::Client) -> bool {
+ true
+ }
+ fn add_replica(&mut self, _replica: Self::Replica) -> bool {
+ true
+ }
+ fn remove_replica(&mut self, _replica: Self::Replica) -> bool {
+ true
+ }
+ async fn send_to_client(
+ &self,
+ _client_id: Self::Client,
+ _data: Self::Data,
+ ) -> Result<(), IggyError> {
+ Ok(())
+ }
+ async fn send_to_replica(
+ &self,
+ replica: Self::Replica,
+ data: Self::Data,
+ ) -> Result<(), IggyError> {
+ self.sent.borrow_mut().push((replica, data));
+ Ok(())
+ }
+ }
+
+ #[test]
+ fn send_or_loopback_routes_self_to_queue() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, SpyBus::new(),
LocalPipeline::new());
+ consensus.init();
+
+ let msg =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+ futures::executor::block_on(consensus.send_or_loopback(0,
msg.into_generic()));
+
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert_eq!(buf.len(), 1);
+ assert!(consensus.message_bus().sent.borrow().is_empty());
+ }
+
+ #[test]
+ fn send_or_loopback_routes_other_to_bus() {
+ let consensus = VsrConsensus::new(1, 0, 3, 0, SpyBus::new(),
LocalPipeline::new());
+ consensus.init();
+
+ let msg =
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>());
+ futures::executor::block_on(consensus.send_or_loopback(1,
msg.into_generic()));
+
+ let mut buf = Vec::new();
+ consensus.drain_loopback_into(&mut buf);
+ assert!(buf.is_empty());
+
+ let sent = consensus.message_bus().sent.borrow();
+ assert_eq!(sent.len(), 1);
+ assert_eq!(sent[0].0, 1);
+ }
+
#[test]
fn drains_head_prefix_by_commit_frontier() {
let consensus = VsrConsensus::new(1, 0, 3, 0, NoopBus,
LocalPipeline::new());
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 0b41b1d84..b51286d3f 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -137,6 +137,57 @@ where
}
}
+ /// Drain and dispatch loopback messages for each consensus plane.
+ ///
+ /// Each plane's loopback is dispatched directly to that plane's `on_ack`,
+ /// avoiding a flat merge that would require re-routing through
`on_message`.
+ ///
+ /// Invariant: planes do not produce loopback messages for each other.
+ /// `on_ack` commits and applies but never calls `push_loopback`, so
+ /// draining metadata before partitions is order-independent.
+ pub async fn process_loopback(&self, buf: &mut
Vec<Message<GenericHeader>>) -> usize
+ where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client =
u128>,
+ J: JournalHandle,
+ <J as JournalHandle>::Target: Journal<
+ <J as JournalHandle>::Storage,
+ Entry = Message<PrepareHeader>,
+ Header = PrepareHeader,
+ >,
+ M: StateMachine<Input = Message<PrepareHeader>>,
+ {
+ debug_assert!(buf.is_empty(), "buf must be empty on entry");
+
+ let mut total = 0;
+ let planes = self.plane.inner();
+
+ if let Some(ref consensus) = planes.0.consensus {
+ consensus.drain_loopback_into(buf);
+ let count = buf.len();
+ total += count;
+ for msg in buf.drain(..) {
+ let typed: Message<PrepareOkHeader> = msg
+ .try_into_typed()
+ .expect("loopback queue must only contain PrepareOk
messages");
+ planes.0.on_ack(typed).await;
+ }
+ }
+
+ if let Some(consensus) = planes.1.0.consensus() {
+ consensus.drain_loopback_into(buf);
+ let count = buf.len();
+ total += count;
+ for msg in buf.drain(..) {
+ let typed: Message<PrepareOkHeader> = msg
+ .try_into_typed()
+ .expect("loopback queue must only contain PrepareOk
messages");
+ planes.1.0.on_ack(typed).await;
+ }
+ }
+
+ total
+ }
+
pub fn init_partition(&mut self, namespace: IggyNamespace)
where
B: MessageBus<
diff --git a/core/simulator/src/lib.rs b/core/simulator/src/lib.rs
index de27135f3..260b76488 100644
--- a/core/simulator/src/lib.rs
+++ b/core/simulator/src/lib.rs
@@ -122,6 +122,14 @@ impl Simulator {
message: Message<iggy_common::header::GenericHeader>,
) {
replica.on_message(message).await;
+
+ let mut buf = Vec::new();
+ replica.process_loopback(&mut buf).await;
+ debug_assert_eq!(
+ replica.process_loopback(&mut buf).await,
+ 0,
+ "on_ack must not re-enqueue loopback messages"
+ );
}
}