This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 1627ce940b714507e649b0c321823d8babe297ca
Author: numinex <[email protected]>
AuthorDate: Mon Jan 19 13:11:25 2026 +0100

    advance v2
---
 core/metadata/src/stm/consumer_group.rs | 84 ++++++++++++++++++++++----------
 core/metadata/src/stm/stream.rs         | 85 ++++++++++++++++++---------------
 core/metadata/src/stm/user.rs           | 43 ++++++++++++-----
 3 files changed, 137 insertions(+), 75 deletions(-)

diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 08d909235..edfd778e7 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -18,66 +18,98 @@
 use crate::stm::Handler;
 use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
-use iggy_common::IggyTimestamp;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
 use slab::Slab;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
 
-#[derive(Debug, Clone, Default)]
+// ============================================================================
+// ConsumerGroupMember Entity
+// ============================================================================
+
+#[derive(Debug, Clone)]
 pub struct ConsumerGroupMember {
-    pub id: u32,
-    pub joined_at: IggyTimestamp,
+    pub id: usize,
+    pub client_id: u32,
+    pub partitions: Vec<usize>,
+    pub partition_index: Arc<AtomicUsize>,
 }
 
 impl ConsumerGroupMember {
-    pub fn new(id: u32, joined_at: IggyTimestamp) -> Self {
-        Self { id, joined_at }
+    pub fn new(id: usize, client_id: u32) -> Self {
+        Self {
+            id,
+            client_id,
+            partitions: Vec::new(),
+            partition_index: Arc::new(AtomicUsize::new(0)),
+        }
     }
 }
 
-#[derive(Debug, Clone, Default)]
+// ============================================================================
+// ConsumerGroup Entity
+// ============================================================================
+
+#[derive(Debug, Clone)]
 pub struct ConsumerGroup {
     pub id: usize,
     pub stream_id: usize,
     pub topic_id: usize,
-    pub name: String,
-    pub created_at: IggyTimestamp,
-    pub members: Vec<ConsumerGroupMember>,
+    pub name: Arc<str>,
+    pub partitions: Vec<usize>,
+    pub members: Slab<ConsumerGroupMember>,
 }
 
 impl ConsumerGroup {
-    pub fn new(stream_id: usize, topic_id: usize, name: String, created_at: 
IggyTimestamp) -> Self {
+    pub fn new(stream_id: usize, topic_id: usize, name: Arc<str>) -> Self {
         Self {
             id: 0,
             stream_id,
             topic_id,
             name,
-            created_at,
-            members: Vec::new(),
+            partitions: Vec::new(),
+            members: Slab::new(),
         }
     }
 
-    pub fn add_member(&mut self, member: ConsumerGroupMember) {
-        self.members.push(member);
-    }
+    /// Rebalance partition assignments among members (round-robin).
+    pub fn rebalance_members(&mut self) {
+        let partition_count = self.partitions.len();
+        let member_count = self.members.len();
 
-    pub fn remove_member(&mut self, member_id: u32) -> 
Option<ConsumerGroupMember> {
-        if let Some(pos) = self.members.iter().position(|m| m.id == member_id) 
{
-            Some(self.members.remove(pos))
-        } else {
-            None
+        if member_count == 0 || partition_count == 0 {
+            return;
         }
-    }
 
-    pub fn members_count(&self) -> usize {
-        self.members.len()
+        // Clear all member partitions
+        let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
+        for &member_id in &member_ids {
+            if let Some(member) = self.members.get_mut(member_id) {
+                member.partitions.clear();
+            }
+        }
+
+        // Rebuild assignments (round-robin)
+        for (i, &partition_id) in self.partitions.iter().enumerate() {
+            let member_idx = i % member_count;
+            if let Some(&member_id) = member_ids.get(member_idx)
+                && let Some(member) = self.members.get_mut(member_id)
+            {
+                member.partitions.push(partition_id);
+            }
+        }
     }
 }
 
+// ============================================================================
+// ConsumerGroups State Machine
+// ============================================================================
+
 define_state! {
     ConsumerGroups {
-        ns_index: AHashMap<(usize, usize), Vec<usize>>,
-        name_index: AHashMap<String, usize>,
+        name_index: AHashMap<Arc<str>, usize>,
+        topic_index: AHashMap<(usize, usize), Vec<usize>>,
         items: Slab<ConsumerGroup>,
     },
     [CreateConsumerGroup, DeleteConsumerGroup]
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 4e21a70c4..f81555ee6 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -26,26 +26,21 @@ use iggy_common::update_stream::UpdateStream;
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use slab::Slab;
 use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
 
-#[derive(Debug, Clone, Default)]
+// ============================================================================
+// Partition Entity
+// ============================================================================
+
+#[derive(Debug, Clone)]
 pub struct Partition {
     pub id: usize,
+    pub created_at: IggyTimestamp,
 }
 
 impl Partition {
-    pub fn new(id: usize) -> Self {
-        Self { id }
-    }
-}
-
-#[derive(Debug, Clone, Default)]
-pub struct Partitions {
-    pub items: Slab<Partition>,
-}
-
-impl Partitions {
-    pub fn new() -> Self {
-        Self::default()
+    pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+        Self { id, created_at }
     }
 }
 
@@ -56,7 +51,7 @@ impl Partitions {
 #[derive(Debug, Clone)]
 pub struct Topic {
     pub id: usize,
-    pub name: String,
+    pub name: Arc<str>,
     pub created_at: IggyTimestamp,
     pub replication_factor: u8,
     pub message_expiry: IggyExpiry,
@@ -64,28 +59,30 @@ pub struct Topic {
     pub max_topic_size: MaxTopicSize,
 
     pub stats: Arc<TopicStats>,
-    pub partitions: Partitions,
+    pub partitions: Vec<Partition>,
+    pub round_robin_counter: Arc<AtomicUsize>,
 }
 
 impl Default for Topic {
     fn default() -> Self {
         Self {
             id: 0,
-            name: String::new(),
+            name: Arc::from(""),
             created_at: IggyTimestamp::default(),
             replication_factor: 1,
             message_expiry: IggyExpiry::default(),
             compression_algorithm: CompressionAlgorithm::default(),
             max_topic_size: MaxTopicSize::default(),
             stats: Arc::new(TopicStats::default()),
-            partitions: Partitions::new(),
+            partitions: Vec::new(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
         }
     }
 }
 
 impl Topic {
     pub fn new(
-        name: String,
+        name: Arc<str>,
         created_at: IggyTimestamp,
         replication_factor: u8,
         message_expiry: IggyExpiry,
@@ -102,41 +99,36 @@ impl Topic {
             compression_algorithm,
             max_topic_size,
             stats: Arc::new(TopicStats::new(stream_stats)),
-            partitions: Partitions::new(),
+            partitions: Vec::new(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
         }
     }
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Topics {
-    pub index: AHashMap<String, usize>,
-    pub items: Slab<Topic>,
-}
-
-impl Topics {
-    pub fn new() -> Self {
-        Self::default()
-    }
-}
+// ============================================================================
+// Stream Entity
+// ============================================================================
 
 #[derive(Debug)]
 pub struct Stream {
     pub id: usize,
-    pub name: String,
+    pub name: Arc<str>,
     pub created_at: IggyTimestamp,
 
     pub stats: Arc<StreamStats>,
-    pub topics: Topics,
+    pub topics: Slab<Topic>,
+    pub topic_index: AHashMap<Arc<str>, usize>,
 }
 
 impl Default for Stream {
     fn default() -> Self {
         Self {
             id: 0,
-            name: String::new(),
+            name: Arc::from(""),
             created_at: IggyTimestamp::default(),
             stats: Arc::new(StreamStats::default()),
-            topics: Topics::new(),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
         }
     }
 }
@@ -149,25 +141,42 @@ impl Clone for Stream {
             created_at: self.created_at,
             stats: self.stats.clone(),
             topics: self.topics.clone(),
+            topic_index: self.topic_index.clone(),
         }
     }
 }
 
 impl Stream {
-    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+    pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self {
         Self {
             id: 0,
             name,
             created_at,
             stats: Arc::new(StreamStats::default()),
-            topics: Topics::new(),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
+        }
+    }
+
+    pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats: 
Arc<StreamStats>) -> Self {
+        Self {
+            id: 0,
+            name,
+            created_at,
+            stats,
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
         }
     }
 }
 
+// ============================================================================
+// Streams State Machine
+// ============================================================================
+
 define_state! {
     Streams {
-        index: AHashMap<String, usize>,
+        index: AHashMap<Arc<str>, usize>,
         items: Slab<Stream>,
     },
     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 0df979a59..c3db009da 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -26,42 +26,63 @@ use iggy_common::update_permissions::UpdatePermissions;
 use iggy_common::update_user::UpdateUser;
 use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, 
UserStatus};
 use slab::Slab;
+use std::sync::Arc;
 
-#[derive(Debug, Clone, Default)]
+// ============================================================================
+// User Entity
+// ============================================================================
+
+#[derive(Debug, Clone)]
 pub struct User {
     pub id: UserId,
-    pub username: String,
-    pub password: String,
+    pub username: Arc<str>,
+    pub password_hash: Arc<str>,
     pub status: UserStatus,
     pub created_at: IggyTimestamp,
-    pub permissions: Option<Permissions>,
-    pub personal_access_tokens: AHashMap<String, PersonalAccessToken>,
+    pub permissions: Option<Arc<Permissions>>,
+}
+
+impl Default for User {
+    fn default() -> Self {
+        Self {
+            id: 0,
+            username: Arc::from(""),
+            password_hash: Arc::from(""),
+            status: UserStatus::default(),
+            created_at: IggyTimestamp::default(),
+            permissions: None,
+        }
+    }
 }
 
 impl User {
     pub fn new(
-        username: String,
-        password: String,
+        username: Arc<str>,
+        password_hash: Arc<str>,
         status: UserStatus,
         created_at: IggyTimestamp,
-        permissions: Option<Permissions>,
+        permissions: Option<Arc<Permissions>>,
     ) -> Self {
         Self {
             id: 0,
             username,
-            password,
+            password_hash,
             status,
             created_at,
             permissions,
-            personal_access_tokens: AHashMap::new(),
         }
     }
 }
 
+// ============================================================================
+// Users State Machine
+// ============================================================================
+
 define_state! {
     Users {
-        index: AHashMap<String, usize>,
+        index: AHashMap<Arc<str>, UserId>,
         items: Slab<User>,
+        personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, 
PersonalAccessToken>>,
         permissioner: Permissioner,
     },
     [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]

Reply via email to