This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch message_bus_final
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d301efbf0e19667c8d141cdd4cd52bd013ae8b80
Author: numminex <[email protected]>
AuthorDate: Thu Dec 4 20:57:06 2025 +0100

    working better
---
 core/message_bus/src/lib.rs | 138 +++++++++++++++++++++++++-------------------
 1 file changed, 78 insertions(+), 60 deletions(-)

diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 7a60c7cb8..a3774c849 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -23,61 +23,78 @@ use std::{
 
 const MAX_CONNECTIONS_PER_REPLICA: usize = 8;
 
-/// Trait for changesets - the Cache type is a GAT defined by the changeset
-pub trait ChangeSet {
-    type Cache;
+/// A cache that can apply changesets
+pub trait Cache {
+    type ChangeSet;
     
-    fn apply(self, cache: &mut Self::Cache);
+    fn apply(&mut self, changes: Self::ChangeSet);
 }
 
-/// Allocation strategy that produces changesets
-pub trait AllocationStrategy {
-    /// The changeset type - defines its own cache type via GAT
-    type ChangeSet: ChangeSet;
-    /// The resource identifier type (e.g., replica id)
+/// Allocation strategy that produces changesets for a specific cache type
+/// Generic over C allows the same strategy impl to work with different caches
+/// (if they share the same ChangeSet type)
+pub trait AllocationStrategy<C> 
+where C: Cache
+{
     type Resource;
 
-    fn allocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>;
-    fn deallocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>;
+    fn allocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>;
+    fn deallocate(&self, resource: Self::Resource) -> Option<C::ChangeSet>;
 }
 
-/// Coordinator that ties a strategy to sharded caches
-/// Cache type is derived from the strategy's changeset
-pub struct Coordinator<S: AllocationStrategy> {
+/// Coordinator that wraps a strategy for a specific cache type
+pub struct Coordinator<S, C>
+where
+    C: Cache,
+    S: AllocationStrategy<C>,
+{
     strategy: S,
+    _cache: std::marker::PhantomData<C>,
 }
 
-impl<S: AllocationStrategy> Coordinator<S> {
+impl<S, C> Coordinator<S, C>
+where
+    C: Cache,
+    S: AllocationStrategy<C>,
+{
     pub fn new(strategy: S) -> Self {
-        Self { strategy }
+        Self {
+            strategy,
+            _cache: std::marker::PhantomData,
+        }
     }
 
-    pub fn allocate(&self, resource: S::Resource) -> Option<S::ChangeSet> {
+    pub fn allocate(&self, resource: S::Resource) -> Option<C::ChangeSet> {
         self.strategy.allocate(resource)
     }
 
-    pub fn deallocate(&self, resource: S::Resource) -> Option<S::ChangeSet> {
+    pub fn deallocate(&self, resource: S::Resource) -> Option<C::ChangeSet> {
         self.strategy.deallocate(resource)
     }
 }
 
-/// Binds a coordinator with its matching cache
-pub struct ShardedAllocation<S: AllocationStrategy> {
-    pub coordinator: Coordinator<S>,
-    pub cache: <S::ChangeSet as ChangeSet>::Cache,
+/// Binds a coordinator with a matching cache
+pub struct ShardedAllocation<S, C>
+where
+    C: Cache,
+    S: AllocationStrategy<C>,
+{
+    pub coordinator: Coordinator<S, C>,
+    pub cache: C,
 }
 
-impl<S: AllocationStrategy> ShardedAllocation<S> {
-    pub fn new(
-        coordinator: Coordinator<S>,
-        cache: <S::ChangeSet as ChangeSet>::Cache,
-    ) -> Self {
+impl<S, C> ShardedAllocation<S, C>
+where
+    C: Cache,
+    S: AllocationStrategy<C>,
+{
+    pub fn new(coordinator: Coordinator<S, C>, cache: C) -> Self {
         Self { coordinator, cache }
     }
 
     pub fn allocate(&mut self, resource: S::Resource) -> bool {
         if let Some(changes) = self.coordinator.allocate(resource) {
-            changes.apply(&mut self.cache);
+            self.cache.apply(changes);
             true
         } else {
             false
@@ -86,7 +103,7 @@ impl<S: AllocationStrategy> ShardedAllocation<S> {
 
     pub fn deallocate(&mut self, resource: S::Resource) -> bool {
         if let Some(changes) = self.coordinator.deallocate(resource) {
-            changes.apply(&mut self.cache);
+            self.cache.apply(changes);
             true
         } else {
             false
@@ -94,9 +111,10 @@ impl<S: AllocationStrategy> ShardedAllocation<S> {
     }
 }
 
-/// Message bus parameterized by allocation strategy
+/// Message bus parameterized by allocation strategy and cache
 pub trait MessageBus {
-    type Strategy: AllocationStrategy;
+    type Cache: Cache;
+    type Strategy: AllocationStrategy<Self::Cache>;
     type ClientId;
 
     fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> 
Result<(), String>;
@@ -121,6 +139,19 @@ pub struct ShardAssignment {
     pub conn_shard: u16,
 }
 
+/// Changeset for connection-based allocation
+#[derive(Debug, Clone)]
+pub enum ConnectionChanges {
+    Allocate {
+        connections: Vec<ConnectionAssignment>,
+        mappings: Vec<ShardAssignment>,
+    },
+    Deallocate {
+        connections: Vec<ConnectionAssignment>,
+        mappings: Vec<ConnectionAssignment>,
+    },
+}
+
 /// Cache for connection state per shard
 pub struct ConnectionCache {
     shard_id: u16,
@@ -146,39 +177,26 @@ impl ConnectionCache {
     }
 }
 
-/// Changeset for connection-based allocation
-#[derive(Debug, Clone)]
-pub enum ConnectionChanges {
-    Allocate {
-        connections: Vec<ConnectionAssignment>,
-        mappings: Vec<ShardAssignment>,
-    },
-    Deallocate {
-        connections: Vec<ConnectionAssignment>,
-        mappings: Vec<ConnectionAssignment>,
-    },
-}
-
-impl ChangeSet for ConnectionChanges {
-    type Cache = ConnectionCache;
+impl Cache for ConnectionCache {
+    type ChangeSet = ConnectionChanges;
 
-    fn apply(self, cache: &mut Self::Cache) {
-        let shard_id = cache.shard_id;
-        match self {
+    fn apply(&mut self, changes: Self::ChangeSet) {
+        let shard_id = self.shard_id;
+        match changes {
             ConnectionChanges::Allocate { connections, mappings } => {
                 for conn in connections.iter().filter(|c| c.shard == shard_id) 
{
-                    cache.connections.insert(conn.replica, None);
+                    self.connections.insert(conn.replica, None);
                 }
-                for mapping in mappings.iter().filter(|m| m.shard == shard_id) 
{
-                    cache.connection_map.insert(mapping.replica, 
mapping.conn_shard);
+                for mapping in &mappings {
+                    self.connection_map.insert(mapping.replica, 
mapping.conn_shard);
                 }
             }
             ConnectionChanges::Deallocate { connections, mappings } => {
                 for conn in connections.iter().filter(|c| c.shard == shard_id) 
{
-                    cache.connections.remove(&conn.replica);
+                    self.connections.remove(&conn.replica);
                 }
-                for mapping in mappings.iter().filter(|m| m.shard == shard_id) 
{
-                    cache.connection_map.remove(&mapping.replica);
+                for mapping in &mappings {
+                    self.connection_map.remove(&mapping.replica);
                 }
             }
         }
@@ -237,11 +255,10 @@ impl LeastLoadedStrategy {
     }
 }
 
-impl AllocationStrategy for LeastLoadedStrategy {
-    type ChangeSet = ConnectionChanges;
+impl AllocationStrategy<ConnectionCache> for LeastLoadedStrategy {
     type Resource = u8;
 
-    fn allocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> {
+    fn allocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> {
         if self.replica_to_shards.borrow().contains_key(&replica) {
             return None;
         }
@@ -278,7 +295,7 @@ impl AllocationStrategy for LeastLoadedStrategy {
         Some(ConnectionChanges::Allocate { connections, mappings })
     }
 
-    fn deallocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> {
+    fn deallocate(&self, replica: Self::Resource) -> Option<ConnectionChanges> 
{
         let conn_shards = 
self.replica_to_shards.borrow_mut().remove(&replica)?;
 
         let mut connections = Vec::new();
@@ -310,7 +327,7 @@ impl AllocationStrategy for LeastLoadedStrategy {
 
 pub struct IggyMessageBus {
     clients: HashMap<u128, SenderKind>,
-    replicas: ShardedAllocation<LeastLoadedStrategy>,
+    replicas: ShardedAllocation<LeastLoadedStrategy, ConnectionCache>,
     shard_id: u16,
 }
 
@@ -345,6 +362,7 @@ impl IggyMessageBus {
 }
 
 impl MessageBus for IggyMessageBus {
+    type Cache = ConnectionCache;
     type Strategy = LeastLoadedStrategy;
     type ClientId = u128;
 

Reply via email to