This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 1627ce940b714507e649b0c321823d8babe297ca Author: numinex <[email protected]> AuthorDate: Mon Jan 19 13:11:25 2026 +0100 advance v2 --- core/metadata/src/stm/consumer_group.rs | 84 ++++++++++++++++++++++---------- core/metadata/src/stm/stream.rs | 85 ++++++++++++++++++--------------- core/metadata/src/stm/user.rs | 43 ++++++++++++----- 3 files changed, 137 insertions(+), 75 deletions(-) diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 08d909235..edfd778e7 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -18,66 +18,98 @@ use crate::stm::Handler; use crate::{define_state, impl_absorb}; use ahash::AHashMap; -use iggy_common::IggyTimestamp; use iggy_common::create_consumer_group::CreateConsumerGroup; use iggy_common::delete_consumer_group::DeleteConsumerGroup; use slab::Slab; +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; -#[derive(Debug, Clone, Default)] +// ============================================================================ +// ConsumerGroupMember Entity +// ============================================================================ + +#[derive(Debug, Clone)] pub struct ConsumerGroupMember { - pub id: u32, - pub joined_at: IggyTimestamp, + pub id: usize, + pub client_id: u32, + pub partitions: Vec<usize>, + pub partition_index: Arc<AtomicUsize>, } impl ConsumerGroupMember { - pub fn new(id: u32, joined_at: IggyTimestamp) -> Self { - Self { id, joined_at } + pub fn new(id: usize, client_id: u32) -> Self { + Self { + id, + client_id, + partitions: Vec::new(), + partition_index: Arc::new(AtomicUsize::new(0)), + } } } -#[derive(Debug, Clone, Default)] +// ============================================================================ +// ConsumerGroup Entity +// ============================================================================ + +#[derive(Debug, Clone)] pub struct ConsumerGroup { pub id: usize, pub stream_id: usize, pub topic_id: usize, - pub name: String, - pub created_at: IggyTimestamp, - pub members: Vec<ConsumerGroupMember>, + pub name: Arc<str>, + pub partitions: Vec<usize>, + pub members: Slab<ConsumerGroupMember>, } impl ConsumerGroup { - pub fn new(stream_id: usize, topic_id: usize, name: String, created_at: IggyTimestamp) -> Self { + pub fn new(stream_id: usize, topic_id: usize, name: Arc<str>) -> Self { Self { id: 0, stream_id, topic_id, name, - created_at, - members: Vec::new(), + partitions: Vec::new(), + members: Slab::new(), } } - pub fn add_member(&mut self, member: ConsumerGroupMember) { - self.members.push(member); - } + /// Rebalance partition assignments among members (round-robin). + pub fn rebalance_members(&mut self) { + let partition_count = self.partitions.len(); + let member_count = self.members.len(); - pub fn remove_member(&mut self, member_id: u32) -> Option<ConsumerGroupMember> { - if let Some(pos) = self.members.iter().position(|m| m.id == member_id) { - Some(self.members.remove(pos)) - } else { - None + if member_count == 0 || partition_count == 0 { + return; } - } - pub fn members_count(&self) -> usize { - self.members.len() + // Clear all member partitions + let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| id).collect(); + for &member_id in &member_ids { + if let Some(member) = self.members.get_mut(member_id) { + member.partitions.clear(); + } + } + + // Rebuild assignments (round-robin) + for (i, &partition_id) in self.partitions.iter().enumerate() { + let member_idx = i % member_count; + if let Some(&member_id) = member_ids.get(member_idx) + && let Some(member) = self.members.get_mut(member_id) + { + member.partitions.push(partition_id); + } + } } } +// ============================================================================ +// ConsumerGroups State Machine +// ============================================================================ + define_state! { ConsumerGroups { - ns_index: AHashMap<(usize, usize), Vec<usize>>, - name_index: AHashMap<String, usize>, + name_index: AHashMap<Arc<str>, usize>, + topic_index: AHashMap<(usize, usize), Vec<usize>>, items: Slab<ConsumerGroup>, }, [CreateConsumerGroup, DeleteConsumerGroup] diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index 4e21a70c4..f81555ee6 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -26,26 +26,21 @@ use iggy_common::update_stream::UpdateStream; use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; use slab::Slab; use std::sync::Arc; +use std::sync::atomic::AtomicUsize; -#[derive(Debug, Clone, Default)] +// ============================================================================ +// Partition Entity +// ============================================================================ + +#[derive(Debug, Clone)] pub struct Partition { pub id: usize, + pub created_at: IggyTimestamp, } impl Partition { - pub fn new(id: usize) -> Self { - Self { id } - } -} - -#[derive(Debug, Clone, Default)] -pub struct Partitions { - pub items: Slab<Partition>, -} - -impl Partitions { - pub fn new() -> Self { - Self::default() + pub fn new(id: usize, created_at: IggyTimestamp) -> Self { + Self { id, created_at } } } @@ -56,7 +51,7 @@ impl Partitions { #[derive(Debug, Clone)] pub struct Topic { pub id: usize, - pub name: String, + pub name: Arc<str>, pub created_at: IggyTimestamp, pub replication_factor: u8, pub message_expiry: IggyExpiry, @@ -64,28 +59,30 @@ pub struct Topic { pub max_topic_size: MaxTopicSize, pub stats: Arc<TopicStats>, - pub partitions: Partitions, + pub partitions: Vec<Partition>, + pub round_robin_counter: Arc<AtomicUsize>, } impl Default for Topic { fn default() -> Self { Self { id: 0, - name: String::new(), + name: Arc::from(""), created_at: IggyTimestamp::default(), replication_factor: 1, message_expiry: IggyExpiry::default(), compression_algorithm: CompressionAlgorithm::default(), max_topic_size: MaxTopicSize::default(), stats: Arc::new(TopicStats::default()), - partitions: Partitions::new(), + partitions: Vec::new(), + round_robin_counter: Arc::new(AtomicUsize::new(0)), } } } impl Topic { pub fn new( - name: String, + name: Arc<str>, created_at: IggyTimestamp, replication_factor: u8, message_expiry: IggyExpiry, @@ -102,41 +99,36 @@ impl Topic { compression_algorithm, max_topic_size, stats: Arc::new(TopicStats::new(stream_stats)), - partitions: Partitions::new(), + partitions: Vec::new(), + round_robin_counter: Arc::new(AtomicUsize::new(0)), } } } -#[derive(Debug, Clone, Default)] -pub struct Topics { - pub index: AHashMap<String, usize>, - pub items: Slab<Topic>, -} - -impl Topics { - pub fn new() -> Self { - Self::default() - } -} +// ============================================================================ +// Stream Entity +// ============================================================================ #[derive(Debug)] pub struct Stream { pub id: usize, - pub name: String, + pub name: Arc<str>, pub created_at: IggyTimestamp, pub stats: Arc<StreamStats>, - pub topics: Topics, + pub topics: Slab<Topic>, + pub topic_index: AHashMap<Arc<str>, usize>, } impl Default for Stream { fn default() -> Self { Self { id: 0, - name: String::new(), + name: Arc::from(""), created_at: IggyTimestamp::default(), stats: Arc::new(StreamStats::default()), - topics: Topics::new(), + topics: Slab::new(), + topic_index: AHashMap::default(), } } } @@ -149,25 +141,42 @@ impl Clone for Stream { created_at: self.created_at, stats: self.stats.clone(), topics: self.topics.clone(), + topic_index: self.topic_index.clone(), } } } impl Stream { - pub fn new(name: String, created_at: IggyTimestamp) -> Self { + pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self { Self { id: 0, name, created_at, stats: Arc::new(StreamStats::default()), - topics: Topics::new(), + topics: Slab::new(), + topic_index: AHashMap::default(), + } + } + + pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats: Arc<StreamStats>) -> Self { + Self { + id: 0, + name, + created_at, + stats, + topics: Slab::new(), + topic_index: AHashMap::default(), } } } +// ============================================================================ +// Streams State Machine +// ============================================================================ + define_state! { Streams { - index: AHashMap<String, usize>, + index: AHashMap<Arc<str>, usize>, items: Slab<Stream>, }, [CreateStream, UpdateStream, DeleteStream, PurgeStream] diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 0df979a59..c3db009da 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -26,42 +26,63 @@ use iggy_common::update_permissions::UpdatePermissions; use iggy_common::update_user::UpdateUser; use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, UserStatus}; use slab::Slab; +use std::sync::Arc; -#[derive(Debug, Clone, Default)] +// ============================================================================ +// User Entity +// ============================================================================ + +#[derive(Debug, Clone)] pub struct User { pub id: UserId, - pub username: String, - pub password: String, + pub username: Arc<str>, + pub password_hash: Arc<str>, pub status: UserStatus, pub created_at: IggyTimestamp, - pub permissions: Option<Permissions>, - pub personal_access_tokens: AHashMap<String, PersonalAccessToken>, + pub permissions: Option<Arc<Permissions>>, +} + +impl Default for User { + fn default() -> Self { + Self { + id: 0, + username: Arc::from(""), + password_hash: Arc::from(""), + status: UserStatus::default(), + created_at: IggyTimestamp::default(), + permissions: None, + } + } } impl User { pub fn new( - username: String, - password: String, + username: Arc<str>, + password_hash: Arc<str>, status: UserStatus, created_at: IggyTimestamp, - permissions: Option<Permissions>, + permissions: Option<Arc<Permissions>>, ) -> Self { Self { id: 0, username, - password, + password_hash, status, created_at, permissions, - personal_access_tokens: AHashMap::new(), } } } +// ============================================================================ +// Users State Machine +// ============================================================================ + define_state! { Users { - index: AHashMap<String, usize>, + index: AHashMap<Arc<str>, UserId>, items: Slab<User>, + personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, PersonalAccessToken>>, permissioner: Permissioner, }, [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]
