This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch message_bus_final
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c0e69adbf21ddac9ab2f7fe4845766945f11e9b0
Author: numminex <[email protected]>
AuthorDate: Thu Dec 4 20:24:02 2025 +0100

    temp
---
 Cargo.lock                                         |   4 +
 core/common/src/types/alloc/buffer.rs              |   0
 core/common/src/types/alloc/memory_pool.rs         |   0
 core/common/src/types/alloc/mod.rs                 |   0
 core/common/src/types/sender/mod.rs                |   0
 core/common/src/types/sender/quic_sender.rs        |   0
 core/common/src/types/sender/tcp_sender.rs         |   0
 core/common/src/types/sender/tcp_tls_sender.rs     |   0
 core/common/src/types/sender/websocket_sender.rs   |   0
 .../src/types/sender/websocket_tls_sender.rs       |   0
 core/message_bus/Cargo.toml                        |   2 +
 core/message_bus/src/lib.rs                        | 339 ++++++++++++++++++++-
 12 files changed, 342 insertions(+), 3 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 59712c563..5946717b0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5784,6 +5784,10 @@ checksum = 
"f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273"
 [[package]]
 name = "message_bus"
 version = "0.1.0"
+dependencies = [
+ "iggy_common",
+ "rand 0.9.2",
+]
 
 [[package]]
 name = "metadata"
diff --git a/core/common/src/types/alloc/buffer.rs 
b/core/common/src/types/alloc/buffer.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/alloc/memory_pool.rs 
b/core/common/src/types/alloc/memory_pool.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/alloc/mod.rs 
b/core/common/src/types/alloc/mod.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/mod.rs 
b/core/common/src/types/sender/mod.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/quic_sender.rs 
b/core/common/src/types/sender/quic_sender.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/tcp_sender.rs 
b/core/common/src/types/sender/tcp_sender.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/tcp_tls_sender.rs 
b/core/common/src/types/sender/tcp_tls_sender.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/websocket_sender.rs 
b/core/common/src/types/sender/websocket_sender.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/common/src/types/sender/websocket_tls_sender.rs 
b/core/common/src/types/sender/websocket_tls_sender.rs
new file mode 100644
index 000000000..e69de29bb
diff --git a/core/message_bus/Cargo.toml b/core/message_bus/Cargo.toml
index 713b8fec9..4286a3a8e 100644
--- a/core/message_bus/Cargo.toml
+++ b/core/message_bus/Cargo.toml
@@ -28,5 +28,7 @@ repository = "https://github.com/apache/iggy";
 readme = "../../../README.md"
 
 [dependencies]
+iggy_common = { workspace = true }
+rand = { workspace = true }
 
 
diff --git a/core/message_bus/src/lib.rs b/core/message_bus/src/lib.rs
index 974061d6c..7a60c7cb8 100644
--- a/core/message_bus/src/lib.rs
+++ b/core/message_bus/src/lib.rs
@@ -14,9 +14,342 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use iggy_common::{SenderKind, TcpSender};
+use rand::{SeedableRng, rngs::StdRng, seq::SliceRandom};
+use std::{
+    cell::RefCell,
+    collections::{HashMap, HashSet}, rc::Rc,
+};
 
-pub trait MessageBus {}
+const MAX_CONNECTIONS_PER_REPLICA: usize = 8;
 
-pub struct IggyMessageBus {}
+/// Trait for changesets - the Cache type is a GAT defined by the changeset
+pub trait ChangeSet {
+    type Cache;
+    
+    fn apply(self, cache: &mut Self::Cache);
+}
 
-impl MessageBus for IggyMessageBus {}
+/// Allocation strategy that produces changesets
+pub trait AllocationStrategy {
+    /// The changeset type - defines its own cache type via GAT
+    type ChangeSet: ChangeSet;
+    /// The resource identifier type (e.g., replica id)
+    type Resource;
+
+    fn allocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>;
+    fn deallocate(&self, resource: Self::Resource) -> Option<Self::ChangeSet>;
+}
+
+/// Coordinator that ties a strategy to sharded caches
+/// Cache type is derived from the strategy's changeset
+pub struct Coordinator<S: AllocationStrategy> {
+    strategy: S,
+}
+
+impl<S: AllocationStrategy> Coordinator<S> {
+    pub fn new(strategy: S) -> Self {
+        Self { strategy }
+    }
+
+    pub fn allocate(&self, resource: S::Resource) -> Option<S::ChangeSet> {
+        self.strategy.allocate(resource)
+    }
+
+    pub fn deallocate(&self, resource: S::Resource) -> Option<S::ChangeSet> {
+        self.strategy.deallocate(resource)
+    }
+}
+
+/// Binds a coordinator with its matching cache
+pub struct ShardedAllocation<S: AllocationStrategy> {
+    pub coordinator: Coordinator<S>,
+    pub cache: <S::ChangeSet as ChangeSet>::Cache,
+}
+
+impl<S: AllocationStrategy> ShardedAllocation<S> {
+    pub fn new(
+        coordinator: Coordinator<S>,
+        cache: <S::ChangeSet as ChangeSet>::Cache,
+    ) -> Self {
+        Self { coordinator, cache }
+    }
+
+    pub fn allocate(&mut self, resource: S::Resource) -> bool {
+        if let Some(changes) = self.coordinator.allocate(resource) {
+            changes.apply(&mut self.cache);
+            true
+        } else {
+            false
+        }
+    }
+
+    pub fn deallocate(&mut self, resource: S::Resource) -> bool {
+        if let Some(changes) = self.coordinator.deallocate(resource) {
+            changes.apply(&mut self.cache);
+            true
+        } else {
+            false
+        }
+    }
+}
+
+/// Message bus parameterized by allocation strategy
+pub trait MessageBus {
+    type Strategy: AllocationStrategy;
+    type ClientId;
+
+    fn send_to_client(&self, client_id: Self::ClientId, data: Vec<u8>) -> 
Result<(), String>;
+}
+
+// ============================================================================
+// Concrete Implementation: Connection-based allocation
+// ============================================================================
+
+/// Identifies a connection on a specific shard
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ConnectionAssignment {
+    pub replica: u8,
+    pub shard: u16,
+}
+
+/// Maps a source shard to the shard that owns the connection
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct ShardAssignment {
+    pub replica: u8,
+    pub shard: u16,
+    pub conn_shard: u16,
+}
+
+/// Cache for connection state per shard
+pub struct ConnectionCache {
+    shard_id: u16,
+    connections: HashMap<u8, Option<Rc<TcpSender>>>,
+    connection_map: HashMap<u8, u16>,
+}
+
+impl ConnectionCache {
+    pub fn new(shard_id: u16) -> Self {
+        Self {
+            shard_id,
+            connections: HashMap::new(),
+            connection_map: HashMap::new(),
+        }
+    }
+
+    pub fn get_connection(&self, replica: u8) -> Option<Rc<TcpSender>> {
+        self.connections.get(&replica).and_then(|opt| opt.clone())
+    }
+
+    pub fn get_mapped_shard(&self, replica: u8) -> Option<u16> {
+        self.connection_map.get(&replica).copied()
+    }
+}
+
+/// Changeset for connection-based allocation
+#[derive(Debug, Clone)]
+pub enum ConnectionChanges {
+    Allocate {
+        connections: Vec<ConnectionAssignment>,
+        mappings: Vec<ShardAssignment>,
+    },
+    Deallocate {
+        connections: Vec<ConnectionAssignment>,
+        mappings: Vec<ConnectionAssignment>,
+    },
+}
+
+impl ChangeSet for ConnectionChanges {
+    type Cache = ConnectionCache;
+
+    fn apply(self, cache: &mut Self::Cache) {
+        let shard_id = cache.shard_id;
+        match self {
+            ConnectionChanges::Allocate { connections, mappings } => {
+                for conn in connections.iter().filter(|c| c.shard == shard_id) 
{
+                    cache.connections.insert(conn.replica, None);
+                }
+                for mapping in mappings.iter().filter(|m| m.shard == shard_id) 
{
+                    cache.connection_map.insert(mapping.replica, 
mapping.conn_shard);
+                }
+            }
+            ConnectionChanges::Deallocate { connections, mappings } => {
+                for conn in connections.iter().filter(|c| c.shard == shard_id) 
{
+                    cache.connections.remove(&conn.replica);
+                }
+                for mapping in mappings.iter().filter(|m| m.shard == shard_id) 
{
+                    cache.connection_map.remove(&mapping.replica);
+                }
+            }
+        }
+    }
+}
+
+/// Least-loaded allocation strategy for connections
+pub struct LeastLoadedStrategy {
+    total_shards: usize,
+    connections_per_shard: RefCell<Vec<(u16, usize)>>,
+    replica_to_shards: RefCell<HashMap<u8, HashSet<u16>>>,
+    rng_seed: u64,
+}
+
+impl LeastLoadedStrategy {
+    pub fn new(total_shards: usize, seed: u64) -> Self {
+        Self {
+            total_shards,
+            connections_per_shard: RefCell::new((0..total_shards).map(|s| (s 
as u16, 0)).collect()),
+            replica_to_shards: RefCell::new(HashMap::new()),
+            rng_seed: seed,
+        }
+    }
+
+    fn create_shard_mappings(
+        &self,
+        mappings: &mut Vec<ShardAssignment>,
+        replica: u8,
+        mut conn_shards: Vec<u16>,
+    ) {
+        for shard in &conn_shards {
+            mappings.push(ShardAssignment {
+                replica,
+                shard: *shard,
+                conn_shard: *shard,
+            });
+        }
+
+        let mut rng = StdRng::seed_from_u64(self.rng_seed);
+        conn_shards.shuffle(&mut rng);
+
+        let mut j = 0;
+        for shard in 0..self.total_shards {
+            let shard = shard as u16;
+            if conn_shards.contains(&shard) {
+                continue;
+            }
+            let conn_idx = j % conn_shards.len();
+            mappings.push(ShardAssignment {
+                replica,
+                shard,
+                conn_shard: conn_shards[conn_idx],
+            });
+            j += 1;
+        }
+    }
+}
+
+impl AllocationStrategy for LeastLoadedStrategy {
+    type ChangeSet = ConnectionChanges;
+    type Resource = u8;
+
+    fn allocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> {
+        if self.replica_to_shards.borrow().contains_key(&replica) {
+            return None;
+        }
+
+        let mut connections = Vec::new();
+        let mut mappings = Vec::new();
+        let connections_needed = 
self.total_shards.min(MAX_CONNECTIONS_PER_REPLICA);
+
+        let mut rng = StdRng::seed_from_u64(self.rng_seed);
+        self.connections_per_shard.borrow_mut().shuffle(&mut rng);
+        self.connections_per_shard
+            .borrow_mut()
+            .sort_by_key(|(_, count)| *count);
+
+        let mut assigned_shards = HashSet::with_capacity(connections_needed);
+
+        for i in 0..connections_needed {
+            let mut connections_per_shard = 
self.connections_per_shard.borrow_mut();
+            let (shard, count) = connections_per_shard.get_mut(i).unwrap();
+            connections.push(ConnectionAssignment {
+                replica,
+                shard: *shard,
+            });
+            *count += 1;
+            assigned_shards.insert(*shard);
+        }
+
+        self.replica_to_shards
+            .borrow_mut()
+            .insert(replica, assigned_shards.clone());
+
+        self.create_shard_mappings(&mut mappings, replica, 
assigned_shards.into_iter().collect());
+
+        Some(ConnectionChanges::Allocate { connections, mappings })
+    }
+
+    fn deallocate(&self, replica: Self::Resource) -> Option<Self::ChangeSet> {
+        let conn_shards = 
self.replica_to_shards.borrow_mut().remove(&replica)?;
+
+        let mut connections = Vec::new();
+        let mut mappings = Vec::new();
+
+        for shard in &conn_shards {
+            if let Some((_, count)) = self
+                .connections_per_shard
+                .borrow_mut()
+                .iter_mut()
+                .find(|(s, _)| s == shard)
+            {
+                *count = count.saturating_sub(1);
+            }
+            connections.push(ConnectionAssignment {
+                replica,
+                shard: *shard,
+            });
+        }
+
+        for shard in 0..self.total_shards {
+            let shard = shard as u16;
+            mappings.push(ConnectionAssignment { replica, shard });
+        }
+
+        Some(ConnectionChanges::Deallocate { connections, mappings })
+    }
+}
+
+pub struct IggyMessageBus {
+    clients: HashMap<u128, SenderKind>,
+    replicas: ShardedAllocation<LeastLoadedStrategy>,
+    shard_id: u16,
+}
+
+impl IggyMessageBus {
+    pub fn new(total_shards: usize, shard_id: u16, seed: u64) -> Self {
+        Self {
+            clients: HashMap::new(),
+            replicas: ShardedAllocation::new(
+                Coordinator::new(LeastLoadedStrategy::new(total_shards, seed)),
+                ConnectionCache::new(shard_id),
+            ),
+            shard_id,
+        }
+    }
+
+    pub fn add_replica(&mut self, replica: u8) -> bool {
+        self.replicas.allocate(replica)
+    }
+
+    pub fn remove_replica(&mut self, replica: u8) -> bool {
+        self.replicas.deallocate(replica)
+    }
+
+    pub fn get_replica_connection(&self, replica: u8) -> Option<Rc<TcpSender>> 
{
+        let mapped_shard = self.replicas.cache.get_mapped_shard(replica)?;
+        if mapped_shard == self.shard_id {
+            self.replicas.cache.get_connection(replica)
+        } else {
+            None
+        }
+    }
+}
+
+impl MessageBus for IggyMessageBus {
+    type Strategy = LeastLoadedStrategy;
+    type ClientId = u128;
+
+    fn send_to_client(&self, _client_id: Self::ClientId, _data: Vec<u8>) -> 
Result<(), String> {
+        // TODO: Implementation
+        Ok(())
+    }
+}

Reply via email to