This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch message_bus_final in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3f5a55316988a73b2cb8d204f1040fb6f3b4b3e8 Author: numminex <[email protected]> AuthorDate: Fri Dec 5 11:07:20 2025 +0100 finito --- .../src/{lib.rs => cache/connection.rs} | 366 ++++++++----------- core/message_bus/src/cache/mod.rs | 13 + core/message_bus/src/lib.rs | 397 ++++----------------- 3 files changed, 242 insertions(+), 534 deletions(-) diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/cache/connection.rs similarity index 55% copy from core/message_bus/src/lib.rs copy to core/message_bus/src/cache/connection.rs index a3774c849..d07ac66ec 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/cache/connection.rs @@ -14,193 +14,23 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use iggy_common::{SenderKind, TcpSender}; +use crate::cache::AllocationStrategy; +use iggy_common::TcpSender; use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom}; use std::{ cell::RefCell, - collections::{HashMap, HashSet}, rc::Rc, + collections::{HashMap, HashSet}, + rc::Rc, }; const MAX_CONNECTIONS_PER_REPLICA: usize = 8; -/// A cache that can apply changesets -pub trait Cache { - type ChangeSet; - - fn apply(&mut self, changes: Self::ChangeSet); -} - -/// Allocation strategy that produces changesets for a specific cache type -/// Generic over C allows the same strategy impl to work with different caches -/// (if they share the same ChangeSet type) -pub trait AllocationStrategy<C> -where C: Cache -{ - type Resource; - - fn allocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; - fn deallocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; -} - -/// Coordinator that wraps a strategy for a specific cache type -pub struct Coordinator<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - strategy: S, - _cache: std::marker::PhantomData<C>, -} +// TODO: Move to some common trait location. +pub trait ShardedState { + type Entry; + type Delta; -impl<S, C> Coordinator<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub fn new(strategy: S) -> Self { - Self { - strategy, - _cache: std::marker::PhantomData, - } - } - - pub fn allocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { - self.strategy.allocate(resource) - } - - pub fn deallocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { - self.strategy.deallocate(resource) - } -} - -/// Binds a coordinator with a matching cache -pub struct ShardedAllocation<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub coordinator: Coordinator<S, C>, - pub cache: C, -} - -impl<S, C> ShardedAllocation<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub fn new(coordinator: Coordinator<S, C>, cache: C) -> Self { - Self { coordinator, cache } - } - - pub fn allocate(&mut self, resource: S::Resource) -> bool { - if let Some(changes) = self.coordinator.allocate(resource) { - self.cache.apply(changes); - true - } else { - false - } - } - - pub fn deallocate(&mut self, resource: S::Resource) -> bool { - if let Some(changes) = self.coordinator.deallocate(resource) { - self.cache.apply(changes); - true - } else { - false - } - } -} - -/// Message bus parameterized by allocation strategy and cache -pub trait MessageBus { - type Cache: Cache; - type Strategy: AllocationStrategy<Self::Cache>; - type ClientId; - - fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> Result<(), String>; -} - -// ============================================================================ -// Concrete Implementation: Connection-based allocation -// ============================================================================ - -/// Identifies a connection on a specific shard -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ConnectionAssignment { - pub replica: u8, - pub shard: u16, -} - -/// Maps a source shard to the shard that owns the connection -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ShardAssignment { - pub replica: u8, - pub shard: u16, - pub conn_shard: u16, -} - -/// Changeset for connection-based allocation -#[derive(Debug, Clone)] -pub enum ConnectionChanges { - Allocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ShardAssignment>, - }, - Deallocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ConnectionAssignment>, - }, -} - -/// Cache for connection state per shard -pub struct ConnectionCache { - shard_id: u16, - connections: HashMap<u8, Option<Rc<TcpSender>>>, - connection_map: HashMap<u8, u16>, -} - -impl ConnectionCache { - pub fn new(shard_id: u16) -> Self { - Self { - shard_id, - connections: HashMap::new(), - connection_map: HashMap::new(), - } - } - - pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { - self.connections.get(&replica).and_then(|opt| opt.clone()) - } - - pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> { - self.connection_map.get(&replica).copied() - } -} - -impl Cache for ConnectionCache { - type ChangeSet = ConnectionChanges; - - fn apply(&mut self, changes: Self::ChangeSet) { - let shard_id = self.shard_id; - match changes { - ConnectionChanges::Allocate { connections, mappings } => { - for conn in connections.iter().filter(|c| c.shard == shard_id) { - self.connections.insert(conn.replica, None); - } - for mapping in &mappings { - self.connection_map.insert(mapping.replica, mapping.conn_shard); - } - } - ConnectionChanges::Deallocate { connections, mappings } => { - for conn in connections.iter().filter(|c| c.shard == shard_id) { - self.connections.remove(&conn.replica); - } - for mapping in &mappings { - self.connection_map.remove(&mapping.replica); - } - } - } - } + fn apply(&mut self, delta: Self::Delta); } /// Least-loaded allocation strategy for connections @@ -255,10 +85,37 @@ impl LeastLoadedStrategy { } } -impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { - type Resource = u8; +/// Identifies a connection on a specific shard +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ConnectionAssignment { + replica: u8, + shard: u16, +} + +/// Maps a source shard to the shard that owns the connection +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ShardAssignment { + replica: u8, + shard: u16, + conn_shard: u16, +} + +/// Changeset for connection-based allocation +#[derive(Debug, Clone)] +pub enum ConnectionChanges { + Allocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ShardAssignment>, + }, + Deallocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ConnectionAssignment>, + }, +} - fn allocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { +type Delta = <ConnectionCache as ShardedState>::Delta; +impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { + fn allocate(&self, replica: u8) -> Option<Delta> { if self.replica_to_shards.borrow().contains_key(&replica) { return None; } @@ -290,12 +147,19 @@ impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { .borrow_mut() .insert(replica, assigned_shards.clone()); - self.create_shard_mappings(&mut mappings, replica, assigned_shards.into_iter().collect()); + self.create_shard_mappings( + &mut mappings, + replica, + assigned_shards.into_iter().collect(), + ); - Some(ConnectionChanges::Allocate { connections, mappings }) + Some(Delta::Allocate { + connections, + mappings, + }) } - fn deallocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { + fn deallocate(&self, replica: u8) -> Option<Delta> { let conn_shards = self.replica_to_shards.borrow_mut().remove(&replica)?; let mut connections = Vec::new(); @@ -321,53 +185,127 @@ impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { mappings.push(ConnectionAssignment { replica, shard }); } - Some(ConnectionChanges::Deallocate { connections, mappings }) + Some(Delta::Deallocate { + connections, + mappings, + }) } } -pub struct IggyMessageBus { - clients: HashMap<u128, SenderKind>, - replicas: ShardedAllocation<LeastLoadedStrategy, ConnectionCache>, - shard_id: u16, +/// Coordinator that wraps a strategy for a specific sharded state type +pub struct Coordinator<A, SS> +where + SS: ShardedState, + A: AllocationStrategy<SS>, +{ + strategy: A, + _ss: std::marker::PhantomData<SS>, } -impl IggyMessageBus { - pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self { +impl<A, SS> Coordinator<A, SS> +where + SS: ShardedState, + A: AllocationStrategy<SS>, +{ + pub fn new(strategy: A) -> Self { Self { - clients: HashMap::new(), - replicas: ShardedAllocation::new( - Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)), - ConnectionCache::new(shard_id), - ), - shard_id, + strategy, + _ss: std::marker::PhantomData, } } - pub fn add_replica(&mut self, replica: u8) -> bool { - self.replicas.allocate(replica) + pub fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta> { + self.strategy.allocate(entry) } - pub fn remove_replica(&mut self, replica: u8) -> bool { - self.replicas.deallocate(replica) + pub fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta> { + self.strategy.deallocate(entry) + } +} + +pub struct ShardedConnections<A, SS> +where + SS: ShardedState, + A: AllocationStrategy<SS>, +{ + pub coordinator: Coordinator<A, SS>, + pub state: SS, +} + +impl<A, SS> ShardedConnections<A, SS> +where + SS: ShardedState, + A: AllocationStrategy<SS>, +{ + pub fn allocate(&mut self, entry: SS::Entry) -> bool { + if let Some(delta) = self.coordinator.allocate(entry) { + // TODO: broadcast to other shards. + self.state.apply(delta); + true + } else { + false + } } - pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { - let mapped_shard = self.replicas.cache.get_mapped_shard(replica)?; - if mapped_shard == self.shard_id { - self.replicas.cache.get_connection(replica) + pub fn deallocate(&mut self, entry: SS::Entry) -> bool { + if let Some(delta) = self.coordinator.deallocate(entry) { + // TODO: broadcast to other shards. + self.state.apply(delta); + true } else { - None + false } } } -impl MessageBus for IggyMessageBus { - type Cache = ConnectionCache; - type Strategy = LeastLoadedStrategy; - type ClientId = u128; +/// Cache for connection state per shard +#[derive(Default)] +pub struct ConnectionCache { + pub shard_id: u16, + pub connections: HashMap<u8, Option<Rc<TcpSender>>>, + pub connection_map: HashMap<u8, u16>, +} + +impl ConnectionCache { + pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { + self.connections.get(&replica).and_then(|opt| opt.clone()) + } + + pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> { + self.connection_map.get(&replica).copied() + } +} + +impl ShardedState for ConnectionCache { + type Entry = u8; // replica id + type Delta = ConnectionChanges; - fn send_to_client(&self, _client_id: Self::ClientId, _data: Vec<u8>) -> Result<(), String> { - // TODO: Implementation - Ok(()) + fn apply(&mut self, delta: Self::Delta) { + let shard_id = self.shard_id; + match delta { + ConnectionChanges::Allocate { + connections, + mappings, + } => { + for conn in connections.iter().filter(|c| c.shard == shard_id) { + self.connections.insert(conn.replica, None); + } + for mapping in &mappings { + self.connection_map + .insert(mapping.replica, mapping.conn_shard); + } + } + ConnectionChanges::Deallocate { + connections, + mappings, + } => { + for conn in connections.iter().filter(|c| c.shard == shard_id) { + self.connections.remove(&conn.replica); + } + for mapping in &mappings { + self.connection_map.remove(&mapping.replica); + } + } + } } } diff --git a/core/message_bus/src/cache/mod.rs b/core/message_bus/src/cache/mod.rs new file mode 100644 index 000000000..eab86256b --- /dev/null +++ b/core/message_bus/src/cache/mod.rs @@ -0,0 +1,13 @@ +use crate::cache::connection::ShardedState; + +// TODO: Move to some common trait location. +/// Allocation strategy that produces deltas for a specific sharded state type +pub trait AllocationStrategy<SS> +where + SS: ShardedState, +{ + fn allocate(&self, entry: SS::Entry) -> Option<SS::Delta>; + fn deallocate(&self, entry: SS::Entry) -> Option<SS::Delta>; +} + +pub(crate) mod connection; diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index a3774c849..c578a1fb0 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -14,360 +14,117 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use iggy_common::{SenderKind, TcpSender}; -use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom}; -use std::{ - cell::RefCell, - collections::{HashMap, HashSet}, rc::Rc, -}; - -const MAX_CONNECTIONS_PER_REPLICA: usize = 8; - -/// A cache that can apply changesets -pub trait Cache { - type ChangeSet; - - fn apply(&mut self, changes: Self::ChangeSet); -} - -/// Allocation strategy that produces changesets for a specific cache type -/// Generic over C allows the same strategy impl to work with different caches -/// (if they share the same ChangeSet type) -pub trait AllocationStrategy<C> -where C: Cache -{ - type Resource; - - fn allocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; - fn deallocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; -} +mod cache; -/// Coordinator that wraps a strategy for a specific cache type -pub struct Coordinator<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - strategy: S, - _cache: std::marker::PhantomData<C>, -} - -impl<S, C> Coordinator<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub fn new(strategy: S) -> Self { - Self { - strategy, - _cache: std::marker::PhantomData, - } - } - - pub fn allocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { - self.strategy.allocate(resource) - } - - pub fn deallocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { - self.strategy.deallocate(resource) - } -} - -/// Binds a coordinator with a matching cache -pub struct ShardedAllocation<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub coordinator: Coordinator<S, C>, - pub cache: C, -} - -impl<S, C> ShardedAllocation<S, C> -where - C: Cache, - S: AllocationStrategy<C>, -{ - pub fn new(coordinator: Coordinator<S, C>, cache: C) -> Self { - Self { coordinator, cache } - } - - pub fn allocate(&mut self, resource: S::Resource) -> bool { - if let Some(changes) = self.coordinator.allocate(resource) { - self.cache.apply(changes); - true - } else { - false - } - } - - pub fn deallocate(&mut self, resource: S::Resource) -> bool { - if let Some(changes) = self.coordinator.deallocate(resource) { - self.cache.apply(changes); - true - } else { - false - } - } -} +use crate::cache::connection::{ + ConnectionCache, Coordinator, LeastLoadedStrategy, ShardedConnections, +}; +use iggy_common::{IggyError, SenderKind, TcpSender, header::GenericHeader, message::Message}; +use std::{collections::HashMap, rc::Rc}; -/// Message bus parameterized by allocation strategy and cache +/// Message bus parameterized by allocation strategy and sharded state pub trait MessageBus { - type Cache: Cache; - type Strategy: AllocationStrategy<Self::Cache>; - type ClientId; - - fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> Result<(), String>; -} - -// ============================================================================ -// Concrete Implementation: Connection-based allocation -// ============================================================================ + type Client; + type Replica; + type Data; -/// Identifies a connection on a specific shard -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ConnectionAssignment { - pub replica: u8, - pub shard: u16, -} + fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool; + fn remove_client(&mut self, client: Self::Client) -> bool; -/// Maps a source shard to the shard that owns the connection -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct ShardAssignment { - pub replica: u8, - pub shard: u16, - pub conn_shard: u16, -} + fn add_replica(&mut self, replica: Self::Replica) -> bool; + fn remove_replica(&mut self, replica: Self::Replica) -> bool; -/// Changeset for connection-based allocation -#[derive(Debug, Clone)] -pub enum ConnectionChanges { - Allocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ShardAssignment>, - }, - Deallocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ConnectionAssignment>, - }, + // TODO: refactor consesus headers. + fn send_to_client( + &self, + client_id: Self::Client, + data: Self::Data, + ) -> impl Future<Output = Result<(), IggyError>>; + fn send_to_replica( + &self, + replica: Self::Replica, + data: Self::Data, + ) -> impl Future<Output = Result<(), IggyError>>; } -/// Cache for connection state per shard -pub struct ConnectionCache { +// TODO: explore generics for Strategy +pub struct IggyMessageBus { + clients: HashMap<u128, SenderKind>, + replicas: ShardedConnections<LeastLoadedStrategy, ConnectionCache>, shard_id: u16, - connections: HashMap<u8, Option<Rc<TcpSender>>>, - connection_map: HashMap<u8, u16>, } -impl ConnectionCache { - pub fn new(shard_id: u16) -> Self { +impl IggyMessageBus { + pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self { Self { + clients: HashMap::new(), + replicas: ShardedConnections { + coordinator: Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)), + state: ConnectionCache { + shard_id, + ..Default::default() + }, + }, shard_id, - connections: HashMap::new(), - connection_map: HashMap::new(), } } - pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { - self.connections.get(&replica).and_then(|opt| opt.clone()) - } - - pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> { - self.connection_map.get(&replica).copied() - } -} - -impl Cache for ConnectionCache { - type ChangeSet = ConnectionChanges; - - fn apply(&mut self, changes: Self::ChangeSet) { - let shard_id = self.shard_id; - match changes { - ConnectionChanges::Allocate { connections, mappings } => { - for conn in connections.iter().filter(|c| c.shard == shard_id) { - self.connections.insert(conn.replica, None); - } - for mapping in &mappings { - self.connection_map.insert(mapping.replica, mapping.conn_shard); - } - } - ConnectionChanges::Deallocate { connections, mappings } => { - for conn in connections.iter().filter(|c| c.shard == shard_id) { - self.connections.remove(&conn.replica); - } - for mapping in &mappings { - self.connection_map.remove(&mapping.replica); - } - } - } - } -} - -/// Least-loaded allocation strategy for connections -pub struct LeastLoadedStrategy { - total_shards: usize, - connections_per_shard: RefCell<Vec<(u16, usize)>>, - replica_to_shards: RefCell<HashMap<u8, HashSet<u16>>>, - rng_seed: u64, -} - -impl LeastLoadedStrategy { - pub fn new(total_shards: usize, seed: u64) -> Self { - Self { - total_shards, - connections_per_shard: RefCell::new((0..total_shards).map(|s| (s as u16, 0)).collect()), - replica_to_shards: RefCell::new(HashMap::new()), - rng_seed: seed, - } - } - - fn create_shard_mappings( - &self, - mappings: &mut Vec<ShardAssignment>, - replica: u8, - mut conn_shards: Vec<u16>, - ) { - for shard in &conn_shards { - mappings.push(ShardAssignment { - replica, - shard: *shard, - conn_shard: *shard, - }); - } - - let mut rng = StdRng::seed_from_u64(self.rng_seed); - conn_shards.shuffle(&mut rng); - - let mut j = 0; - for shard in 0..self.total_shards { - let shard = shard as u16; - if conn_shards.contains(&shard) { - continue; - } - let conn_idx = j % conn_shards.len(); - mappings.push(ShardAssignment { - replica, - shard, - conn_shard: conn_shards[conn_idx], - }); - j += 1; + pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { + let mapped_shard = self.replicas.state.get_mapped_shard(replica)?; + if mapped_shard == self.shard_id { + self.replicas.state.get_connection(replica) + } else { + None } } } -impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { - type Resource = u8; - - fn allocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { - if self.replica_to_shards.borrow().contains_key(&replica) { - return None; - } - - let mut connections = Vec::new(); - let mut mappings = Vec::new(); - let connections_needed = self.total_shards.min(MAX_CONNECTIONS_PER_REPLICA); - - let mut rng = StdRng::seed_from_u64(self.rng_seed); - self.connections_per_shard.borrow_mut().shuffle(&mut rng); - self.connections_per_shard - .borrow_mut() - .sort_by_key(|(_, count)| *count); - - let mut assigned_shards = HashSet::with_capacity(connections_needed); - - for i in 0..connections_needed { - let mut connections_per_shard = self.connections_per_shard.borrow_mut(); - let (shard, count) = connections_per_shard.get_mut(i).unwrap(); - connections.push(ConnectionAssignment { - replica, - shard: *shard, - }); - *count += 1; - assigned_shards.insert(*shard); - } - - self.replica_to_shards - .borrow_mut() - .insert(replica, assigned_shards.clone()); - - self.create_shard_mappings(&mut mappings, replica, assigned_shards.into_iter().collect()); - - Some(ConnectionChanges::Allocate { connections, mappings }) - } - - fn deallocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { - let conn_shards = self.replica_to_shards.borrow_mut().remove(&replica)?; - - let mut connections = Vec::new(); - let mut mappings = Vec::new(); - - for shard in &conn_shards { - if let Some((_, count)) = self - .connections_per_shard - .borrow_mut() - .iter_mut() - .find(|(s, _)| s == shard) - { - *count = count.saturating_sub(1); - } - connections.push(ConnectionAssignment { - replica, - shard: *shard, - }); - } +impl MessageBus for IggyMessageBus { + type Client = u128; + type Replica = u8; + type Data = Message<GenericHeader>; - for shard in 0..self.total_shards { - let shard = shard as u16; - mappings.push(ConnectionAssignment { replica, shard }); + fn add_client(&mut self, client: Self::Client, sender: SenderKind) -> bool { + if self.clients.contains_key(&client) { + return false; } - - Some(ConnectionChanges::Deallocate { connections, mappings }) + self.clients.insert(client, sender); + true } -} -pub struct IggyMessageBus { - clients: HashMap<u128, SenderKind>, - replicas: ShardedAllocation<LeastLoadedStrategy, ConnectionCache>, - shard_id: u16, -} - -impl IggyMessageBus { - pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self { - Self { - clients: HashMap::new(), - replicas: ShardedAllocation::new( - Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)), - ConnectionCache::new(shard_id), - ), - shard_id, - } + fn remove_client(&mut self, client: Self::Client) -> bool { + self.clients.remove(&client).is_some() } - pub fn add_replica(&mut self, replica: u8) -> bool { + fn add_replica(&mut self, replica: Self::Replica) -> bool { self.replicas.allocate(replica) } - pub fn remove_replica(&mut self, replica: u8) -> bool { + fn remove_replica(&mut self, replica: Self::Replica) -> bool { self.replicas.deallocate(replica) } - pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>> { - let mapped_shard = self.replicas.cache.get_mapped_shard(replica)?; - if mapped_shard == self.shard_id { - self.replicas.cache.get_connection(replica) - } else { - None - } + async fn send_to_client( + &self, + client_id: Self::Client, + _message: Self::Data, + ) -> Result<(), IggyError> { + let _sender = self + .clients + .get(&client_id) + .ok_or(IggyError::ClientNotFound(client_id as u32))?; + Ok(()) } -} -impl MessageBus for IggyMessageBus { - type Cache = ConnectionCache; - type Strategy = LeastLoadedStrategy; - type ClientId = u128; - - fn send_to_client(&self, _client_id: Self::ClientId, _data: Vec<u8>) -> Result<(), String> { - // TODO: Implementation + async fn send_to_replica( + &self, + replica: Self::Replica, + _message: Self::Data, + ) -> Result<(), IggyError> { + // TODO: Handle lazily creating the connection. + let _connection = self + .get_replica_connection(replica) + .ok_or(IggyError::ResourceNotFound(format!("Replica {}", replica)))?; Ok(()) } }
