This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch drain-bus
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/drain-bus by this push:
new 04d8255e7 feat(message_bus): add drain method to collect buffered
outbound messages
04d8255e7 is described below
commit 04d8255e71318b5e86fc9c252472774d9c775ea5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 9 18:53:14 2026 +0100
feat(message_bus): add drain method to collect buffered outbound messages
MessageBus only had send_to_client/send_to_replica but no
batched retrieval for actual delivery. Add drain(&self, buf)
following the drain_loopback_into pattern. IggyMessageBus
buffers in a RefCell<VecDeque> outbox; MemBus drains from
its pending_messages queue.
---
core/consensus/src/plane_helpers.rs | 4 ++
core/message_bus/src/lib.rs | 50 ++++++++++++++-
core/simulator/src/bus.rs | 125 +++++++++++++++++++++++++++++++++++-
3 files changed, 175 insertions(+), 4 deletions(-)
diff --git a/core/consensus/src/plane_helpers.rs
b/core/consensus/src/plane_helpers.rs
index 55d21534a..3e0018371 100644
--- a/core/consensus/src/plane_helpers.rs
+++ b/core/consensus/src/plane_helpers.rs
@@ -399,6 +399,8 @@ mod tests {
) -> Result<(), IggyError> {
Ok(())
}
+
+ fn drain(&self, _buf: &mut Vec<Self::Data>) {}
}
fn prepare_message(op: u64, parent: u128, checksum: u128) ->
Message<PrepareHeader> {
@@ -607,6 +609,8 @@ mod tests {
self.sent.borrow_mut().push((replica, data));
Ok(())
}
+
+ fn drain(&self, _buf: &mut Vec<Self::Data>) {}
}
#[test]
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 876061bc4..83356d6e6 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -20,7 +20,9 @@ use crate::cache::connection::{
ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections,
};
use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader,
message::Message};
-use std::{collections::HashMap, rc::Rc};
+use std::cell::RefCell;
+use std::collections::{HashMap, VecDeque};
+use std::rc::Rc;
/// Message bus parameterized by allocation strategy and sharded state
pub trait MessageBus {
@@ -46,6 +48,12 @@ pub trait MessageBus {
replica: Self::Replica,
data: Self::Data,
) -> impl Future<Output = Result<(), IggyError>>;
+
+ /// Drain all buffered outbound messages into `buf`, leaving the internal
queue empty.
+ ///
+ /// Messages are enqueued by [`MessageBus::send_to_client`] /
[`MessageBus::send_to_replica`].
+ /// The caller is responsible for dispatching them to their targets.
+ fn drain(&self, buf: &mut Vec<Self::Data>);
}
// TODO: explore generics for Strategy
@@ -54,6 +62,7 @@ pub struct IggyMessageBus {
clients: HashMap<u128, SenderKind>,
replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>,
shard_id: u16,
+ outbox: RefCell<VecDeque<Message<GenericHeader>>>,
}
impl IggyMessageBus {
@@ -69,6 +78,7 @@ impl IggyMessageBus {
},
},
shard_id,
+ outbox: RefCell::new(VecDeque::new()),
}
}
@@ -112,25 +122,59 @@ impl MessageBus for IggyMessageBus {
async fn send_to_client(
&self,
client_id: Self::Client,
- _message: Self::Data,
+ message: Self::Data,
) -> Result<(), IggyError> {
#[allow(clippy::cast_possible_truncation)] //
IggyError::ClientNotFound takes u32
let _sender = self
.clients
.get(&client_id)
.ok_or(IggyError::ClientNotFound(client_id as u32))?;
+ self.outbox.borrow_mut().push_back(message);
Ok(())
}
async fn send_to_replica(
&self,
replica: Self::Replica,
- _message: Self::Data,
+ message: Self::Data,
) -> Result<(), IggyError> {
// TODO: Handle lazily creating the connection.
let _connection = self
.get_replica_connection(replica)
.ok_or(IggyError::ResourceNotFound(format!("Replica {replica}")))?;
+ self.outbox.borrow_mut().push_back(message);
Ok(())
}
+
+ fn drain(&self, buf: &mut Vec<Self::Data>) {
+ buf.extend(self.outbox.borrow_mut().drain(..));
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn drain_empty_yields_nothing() {
+ let bus = IggyMessageBus::new(1, 0, 42);
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn double_drain_second_empty() {
+ let bus = IggyMessageBus::new(1, 0, 42);
+ let msg =
Message::<GenericHeader>::new(std::mem::size_of::<GenericHeader>());
+ bus.outbox.borrow_mut().push_back(msg);
+
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert_eq!(buf.len(), 1);
+
+ buf.clear();
+ bus.drain(&mut buf);
+ assert!(buf.is_empty());
+ }
}
diff --git a/core/simulator/src/bus.rs b/core/simulator/src/bus.rs
index 371258aef..a0ae7cda5 100644
--- a/core/simulator/src/bus.rs
+++ b/core/simulator/src/bus.rs
@@ -51,13 +51,21 @@ impl MemBus {
}
}
- /// Get the next pending message from the bus
+ /// Get the next pending message from the bus.
///
/// # Panics
/// Panics if the internal mutex is poisoned.
pub fn receive(&self) -> Option<Envelope> {
self.pending_messages.lock().unwrap().pop_front()
}
+
+ /// Drain all pending envelopes into `buf`, preserving routing metadata.
+ ///
+ /// # Panics
+ /// Panics if the internal mutex is poisoned.
+ pub fn drain_envelopes(&self, buf: &mut Vec<Envelope>) {
+ buf.extend(self.pending_messages.lock().unwrap().drain(..));
+ }
}
impl MessageBus for MemBus {
@@ -128,6 +136,16 @@ impl MessageBus for MemBus {
Ok(())
}
+
+ fn drain(&self, buf: &mut Vec<Self::Data>) {
+ buf.extend(
+ self.pending_messages
+ .lock()
+ .unwrap()
+ .drain(..)
+ .map(|envelope| envelope.message),
+ );
+ }
}
/// Newtype wrapper for shared [`MemBus`] that implements [`MessageBus`]
@@ -178,4 +196,109 @@ impl MessageBus for SharedMemBus {
) -> Result<(), IggyError> {
self.0.send_to_replica(replica, message).await
}
+
+ fn drain(&self, buf: &mut Vec<Self::Data>) {
+ self.0.drain(buf);
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_common::header::GenericHeader;
+
+ fn make_message() -> Message<GenericHeader> {
+ Message::<GenericHeader>::new(std::mem::size_of::<GenericHeader>())
+ }
+
+ #[test]
+ fn drain_empty_yields_nothing() {
+ let bus = MemBus::new();
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn send_to_replica_then_drain() {
+ let mut bus = MemBus::new();
+ bus.add_replica(0);
+
+ futures::executor::block_on(bus.send_to_replica(0,
make_message())).unwrap();
+ futures::executor::block_on(bus.send_to_replica(0,
make_message())).unwrap();
+
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert_eq!(buf.len(), 2);
+ }
+
+ #[test]
+ fn send_to_client_then_drain() {
+ let mut bus = MemBus::new();
+ bus.add_client(42, ());
+
+ futures::executor::block_on(bus.send_to_client(42,
make_message())).unwrap();
+
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert_eq!(buf.len(), 1);
+ }
+
+ #[test]
+ fn double_drain_second_empty() {
+ let mut bus = MemBus::new();
+ bus.add_replica(0);
+
+ futures::executor::block_on(bus.send_to_replica(0,
make_message())).unwrap();
+
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert_eq!(buf.len(), 1);
+
+ buf.clear();
+ bus.drain(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn send_to_unknown_replica_errors_and_drain_empty() {
+ let bus = MemBus::new();
+ let result = futures::executor::block_on(bus.send_to_replica(99,
make_message()));
+ assert!(result.is_err());
+
+ let mut buf = Vec::new();
+ bus.drain(&mut buf);
+ assert!(buf.is_empty());
+ }
+
+ #[test]
+ fn drain_envelopes_preserves_routing() {
+ let mut bus = MemBus::new();
+ bus.add_replica(1);
+ bus.add_client(42, ());
+
+ futures::executor::block_on(bus.send_to_replica(1,
make_message())).unwrap();
+ futures::executor::block_on(bus.send_to_client(42,
make_message())).unwrap();
+
+ let mut envelopes = Vec::new();
+ bus.drain_envelopes(&mut envelopes);
+ assert_eq!(envelopes.len(), 2);
+ assert_eq!(envelopes[0].to_replica, Some(1));
+ assert_eq!(envelopes[0].to_client, None);
+ assert_eq!(envelopes[1].to_client, Some(42));
+ assert_eq!(envelopes[1].to_replica, None);
+ }
+
+ #[test]
+ fn shared_mem_bus_drain_delegates() {
+ let bus = Arc::new(MemBus::new());
+ let mut shared = SharedMemBus(bus);
+ shared.add_replica(0);
+
+ futures::executor::block_on(shared.send_to_replica(0,
make_message())).unwrap();
+
+ let mut buf = Vec::new();
+ shared.drain(&mut buf);
+ assert_eq!(buf.len(), 1);
+ }
}