This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch message_bus_final in repository https://gitbox.apache.org/repos/asf/iggy.git
commit d301efbf0e19667c8d141cdd4cd52bd013ae8b80 Author: numminex <[email protected]> AuthorDate: Thu Dec 4 20:57:06 2025 +0100 working better --- core/message_bus/src/lib.rs | 138 +++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs index 7a60c7cb8..a3774c849 100644 --- a/core/message_bus/src/lib.rs +++ b/core/message_bus/src/lib.rs @@ -23,61 +23,78 @@ use std::{ const MAX_CONNECTIONS_PER_REPLICA: usize = 8; -/// Trait for changesets - the Cache type is a GAT defined by the changeset -pub trait ChangeSet { - type Cache; +/// A cache that can apply changesets +pub trait Cache { + type ChangeSet; - fn apply(self, cache: &mut Self::Cache); + fn apply(&mut self, changes: Self::ChangeSet); } -/// Allocation strategy that produces changesets -pub trait AllocationStrategy { - /// The changeset type - defines its own cache type via GAT - type ChangeSet: ChangeSet; - /// The resource identifier type (e.g., replica id) +/// Allocation strategy that produces changesets for a specific cache type +/// Generic over C allows the same strategy impl to work with different caches +/// (if they share the same ChangeSet type) +pub trait AllocationStrategy<C> +where C: Cache +{ type Resource; - fn allocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>; - fn deallocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>; + fn allocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; + fn deallocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>; } -/// Coordinator that ties a strategy to sharded caches -/// Cache type is derived from the strategy's changeset -pub struct Coordinator<S: AllocationStrategy> { +/// Coordinator that wraps a strategy for a specific cache type +pub struct Coordinator<S, C> +where + C: Cache, + S: AllocationStrategy<C>, +{ strategy: S, + _cache: std::marker::PhantomData<C>, } -impl<S: AllocationStrategy> Coordinator<S> { +impl<S, C> Coordinator<S, C> +where + C: Cache, + S: AllocationStrategy<C>, +{ pub fn new(strategy: S) -> Self { - Self { strategy } + Self { + strategy, + _cache: std::marker::PhantomData, + } } - pub fn allocate(&self, resource: S::Resource) -> Option<S::ChangeSet> { + pub fn allocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { self.strategy.allocate(resource) } - pub fn deallocate(&self, resource: S::Resource) -> Option<S::ChangeSet> { + pub fn deallocate(&self, resource: S::Resource) -> Option<C::ChangeSet> { self.strategy.deallocate(resource) } } -/// Binds a coordinator with its matching cache -pub struct ShardedAllocation<S: AllocationStrategy> { - pub coordinator: Coordinator<S>, - pub cache: <S::ChangeSet as ChangeSet>::Cache, +/// Binds a coordinator with a matching cache +pub struct ShardedAllocation<S, C> +where + C: Cache, + S: AllocationStrategy<C>, +{ + pub coordinator: Coordinator<S, C>, + pub cache: C, } -impl<S: AllocationStrategy> ShardedAllocation<S> { - pub fn new( - coordinator: Coordinator<S>, - cache: <S::ChangeSet as ChangeSet>::Cache, - ) -> Self { +impl<S, C> ShardedAllocation<S, C> +where + C: Cache, + S: AllocationStrategy<C>, +{ + pub fn new(coordinator: Coordinator<S, C>, cache: C) -> Self { Self { coordinator, cache } } pub fn allocate(&mut self, resource: S::Resource) -> bool { if let Some(changes) = self.coordinator.allocate(resource) { - changes.apply(&mut self.cache); + self.cache.apply(changes); true } else { false @@ -86,7 +103,7 @@ impl<S: AllocationStrategy> ShardedAllocation<S> { pub fn deallocate(&mut self, resource: S::Resource) -> bool { if let Some(changes) = self.coordinator.deallocate(resource) { - changes.apply(&mut self.cache); + self.cache.apply(changes); true } else { false @@ -94,9 +111,10 @@ impl<S: AllocationStrategy> ShardedAllocation<S> { } } -/// Message bus parameterized by allocation strategy +/// Message bus parameterized by allocation strategy and cache pub trait MessageBus { - type Strategy: AllocationStrategy; + type Cache: Cache; + type Strategy: AllocationStrategy<Self::Cache>; type ClientId; fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> Result<(), String>; @@ -121,6 +139,19 @@ pub struct ShardAssignment { pub conn_shard: u16, } +/// Changeset for connection-based allocation +#[derive(Debug, Clone)] +pub enum ConnectionChanges { + Allocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ShardAssignment>, + }, + Deallocate { + connections: Vec<ConnectionAssignment>, + mappings: Vec<ConnectionAssignment>, + }, +} + /// Cache for connection state per shard pub struct ConnectionCache { shard_id: u16, @@ -146,39 +177,26 @@ impl ConnectionCache { } } -/// Changeset for connection-based allocation -#[derive(Debug, Clone)] -pub enum ConnectionChanges { - Allocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ShardAssignment>, - }, - Deallocate { - connections: Vec<ConnectionAssignment>, - mappings: Vec<ConnectionAssignment>, - }, -} - -impl ChangeSet for ConnectionChanges { - type Cache = ConnectionCache; +impl Cache for ConnectionCache { + type ChangeSet = ConnectionChanges; - fn apply(self, cache: &mut Self::Cache) { - let shard_id = cache.shard_id; - match self { + fn apply(&mut self, changes: Self::ChangeSet) { + let shard_id = self.shard_id; + match changes { ConnectionChanges::Allocate { connections, mappings } => { for conn in connections.iter().filter(|c| c.shard == shard_id) { - cache.connections.insert(conn.replica, None); + self.connections.insert(conn.replica, None); } - for mapping in mappings.iter().filter(|m| m.shard == shard_id) { - cache.connection_map.insert(mapping.replica, mapping.conn_shard); + for mapping in &mappings { + self.connection_map.insert(mapping.replica, mapping.conn_shard); } } ConnectionChanges::Deallocate { connections, mappings } => { for conn in connections.iter().filter(|c| c.shard == shard_id) { - cache.connections.remove(&conn.replica); + self.connections.remove(&conn.replica); } - for mapping in mappings.iter().filter(|m| m.shard == shard_id) { - cache.connection_map.remove(&mapping.replica); + for mapping in &mappings { + self.connection_map.remove(&mapping.replica); } } } @@ -237,11 +255,10 @@ impl LeastLoadedStrategy { } } -impl AllocationStrategy for LeastLoadedStrategy { - type ChangeSet = ConnectionChanges; +impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy { type Resource = u8; - fn allocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> { + fn allocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { if self.replica_to_shards.borrow().contains_key(&replica) { return None; } @@ -278,7 +295,7 @@ impl AllocationStrategy for LeastLoadedStrategy { Some(ConnectionChanges::Allocate { connections, mappings }) } - fn deallocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> { + fn deallocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> { let conn_shards = self.replica_to_shards.borrow_mut().remove(&replica)?; let mut connections = Vec::new(); @@ -310,7 +327,7 @@ impl AllocationStrategy for LeastLoadedStrategy { pub struct IggyMessageBus { clients: HashMap<u128, SenderKind>, - replicas: ShardedAllocation<LeastLoadedStrategy>, + replicas: ShardedAllocation<LeastLoadedStrategy, ConnectionCache>, shard_id: u16, } @@ -345,6 +362,7 @@ impl IggyMessageBus { } impl MessageBus for IggyMessageBus { + type Cache = ConnectionCache; type Strategy = LeastLoadedStrategy; type ClientId = u128;
