This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch experiment in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 35cd19f5f443a85aefdcaa2f7d783b7af03b8ded Author: numinex <[email protected]> AuthorDate: Mon Jan 19 12:50:54 2026 +0100 advance --- Cargo.lock | 1 + core/metadata/Cargo.toml | 1 + core/metadata/src/stm/consumer_group.rs | 41 ++++---- core/metadata/src/stm/mod.rs | 168 ++++++++++++++------------------ core/metadata/src/stm/stream.rs | 97 +----------------- core/metadata/src/stm/user.rs | 13 +-- 6 files changed, 102 insertions(+), 219 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7a1a3d7b..c67ddb2bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5704,6 +5704,7 @@ dependencies = [ "journal", "left-right", "message_bus", + "paste", "slab", "tracing", ] diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index d7fcecfc6..16886a3fc 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -35,5 +35,6 @@ iggy_common = { path = "../common" } journal = { path = "../journal" } left-right = { workspace = true } message_bus = { path = "../message_bus" } +paste = "1.0" slab = "0.4.11" tracing = { workspace = true } diff --git a/core/metadata/src/stm/consumer_group.rs b/core/metadata/src/stm/consumer_group.rs index 7eb5d08cb..08d909235 100644 --- a/core/metadata/src/stm/consumer_group.rs +++ b/core/metadata/src/stm/consumer_group.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. +use crate::stm::Handler; +use crate::{define_state, impl_absorb}; use ahash::AHashMap; use iggy_common::IggyTimestamp; +use iggy_common::create_consumer_group::CreateConsumerGroup; +use iggy_common::delete_consumer_group::DeleteConsumerGroup; use slab::Slab; -// ============================================================================ -// ConsumerGroupMember - Individual member of a consumer group -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct ConsumerGroupMember { pub id: u32, @@ -35,10 +35,6 @@ impl ConsumerGroupMember { } } -// ============================================================================ -// ConsumerGroup - A group of consumers -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct ConsumerGroup { pub id: usize, @@ -78,18 +74,25 @@ impl ConsumerGroup { } } -// ============================================================================ -// ConsumerGroups Collection -// ============================================================================ - -#[derive(Debug, Clone, Default)] -pub struct ConsumerGroups { - pub index: AHashMap<(usize, usize, String), usize>, - pub items: Slab<ConsumerGroup>, +define_state! { + ConsumerGroups { + ns_index: AHashMap<(usize, usize), Vec<usize>>, + name_index: AHashMap<String, usize>, + items: Slab<ConsumerGroup>, + }, + [CreateConsumerGroup, DeleteConsumerGroup] } +impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand); -impl ConsumerGroups { - pub fn new() -> Self { - Self::default() +impl Handler for ConsumerGroupsInner { + fn handle(&mut self, cmd: &ConsumerGroupsCommand) { + match cmd { + ConsumerGroupsCommand::CreateConsumerGroup(_payload) => { + // Actual mutation logic will be implemented later + } + ConsumerGroupsCommand::DeleteConsumerGroup(_payload) => { + // Actual mutation logic will be implemented later + } + } } } diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index 518582bbd..c4c5b1562 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -64,14 +64,6 @@ pub trait Handler: Command { fn handle(&mut self, cmd: &Self::Cmd); } -/// Storage abstraction: applies commands to inner state. -pub trait ApplyState { - type Inner: Handler; - type Output; - - fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output; -} - pub struct LeftRight<T, C> where T: Absorb<C>, @@ -93,14 +85,11 @@ where } } -impl<T> ApplyState for LeftRight<T, <T as Command>::Cmd> +impl<T> LeftRight<T, <T as Command>::Cmd> where T: Absorb<<T as Command>::Cmd> + Clone + Handler, { - type Inner = T; - type Output = (); - - fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output { + pub fn do_apply(&self, cmd: <T as Command>::Cmd) { self.write .as_ref() .expect("no write handle - not the owner shard") @@ -122,29 +111,27 @@ pub trait StateMachine { fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>); } -/// Generates a state machine with pluggable storage. +/// Generates a state machine with convention-based storage. /// /// # Generated items -/// - `$inner` struct with the specified fields (the data) -/// - `$command` enum with variants for each operation -/// - `$state<S: ApplyState<Inner = $inner>>` wrapper struct (storage-agnostic) -/// - `Command` impl for `$inner` (parsing) -/// - `Absorb` impl for `$inner` (delegates to `Handler::handle`) -/// - `State` impl for `$state<S>` -/// - `From<S>` impl for `$state<S>` +/// - `{$state}Inner` struct with the specified fields (the data) +/// - `{$state}Command` enum with variants for each operation +/// - `$state` wrapper struct (non-generic, contains LeftRight storage) +/// - `Command` impl for `{$state}Inner` (parsing) +/// - `State` impl for `$state` +/// - `From<LeftRight<...>>` impl for `$state` /// /// # User must implement -/// - `Handler` for `$inner` (business logic) +/// - `Handler` for `{$state}Inner` (business logic) +/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command` /// /// # Example /// ```ignore /// define_state! { -/// Streams, -/// StreamsInner { +/// Streams { /// index: AHashMap<String, usize>, /// items: Slab<Stream>, /// }, -/// StreamsCommand, /// [CreateStream, UpdateStream, DeleteStream] /// } /// @@ -157,97 +144,89 @@ pub trait StateMachine { /// } /// } /// } +/// +/// // User implements Absorb via macro: +/// impl_absorb!(StreamsInner, StreamsCommand); /// ``` +// TODO: The `operation` argument can be removed, once we create an trait for mapping. #[macro_export] macro_rules! define_state { ( - $state:ident, - $inner:ident { + $state:ident { $($field_name:ident : $field_type:ty),* $(,)? }, - $command:ident, [$($operation:ident),* $(,)?] ) => { - #[derive(Debug, Clone, Default)] - pub struct $inner { - $( - pub $field_name: $field_type, - )* - } - - #[derive(Debug, Clone)] - pub enum $command { - $( - $operation($operation), - )* - } - - pub struct $state<S: $crate::stm::ApplyState<Inner = $inner>> { - inner: S, - } + paste::paste! { + #[derive(Debug, Clone, Default)] + pub struct [<$state Inner>] { + $( + pub $field_name: $field_type, + )* + } - impl<S: $crate::stm::ApplyState<Inner = $inner>> From<S> for $state<S> { - fn from(storage: S) -> Self { - Self { inner: storage } + impl [<$state Inner>] { + pub fn new() -> Self { + Self::default() + } } - } - impl<S> $crate::stm::State for $state<S> - where - S: $crate::stm::ApplyState<Inner = $inner>, - { - type Input = <$inner as $crate::stm::Command>::Input; - type Output = S::Output; + #[derive(Debug, Clone)] + pub enum [<$state Command>] { + $( + $operation($operation), + )* + } - fn apply(&self, input: &Self::Input) -> Option<Self::Output> { - <$inner as $crate::stm::Command>::parse(input) - .map(|cmd| self.inner.do_apply(cmd)) + pub struct $state { + inner: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>, } - } - impl $crate::stm::Command for $inner { - type Cmd = $command; - type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; - - fn parse(input: &Self::Input) -> Option<Self::Cmd> { - use ::iggy_common::BytesSerializable; - use ::iggy_common::header::Operation; - - let body = input.body_bytes(); - match input.header().operation { - $( - Operation::$operation => { - Some($command::$operation( - $operation::from_bytes(body).unwrap() - )) - }, - )* - _ => None, + impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>> for $state { + fn from(storage: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]>) -> Self { + Self { inner: storage } } } - } - /* - impl ::left_right::Absorb<$command> for $inner - where - $inner: $crate::stm::Handler, - { - fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) { - <Self as $crate::stm::Handler>::handle(self, cmd); + impl From<[<$state Inner>]> for $state { + fn from(inner: [<$state Inner>]) -> Self { + let left_right: $crate::stm::LeftRight<[<$state Inner>], [<$state Command>]> = inner.into(); + left_right.into() + } } - fn absorb_second(&mut self, cmd: $command, _other: &Self) { - <Self as $crate::stm::Handler>::handle(self, &cmd); - } + impl $crate::stm::State for $state { + type Input = <[<$state Inner>] as $crate::stm::Command>::Input; + type Output = (); - fn sync_with(&mut self, first: &Self) { - *self = first.clone(); + fn apply(&self, input: &Self::Input) -> Option<Self::Output> { + <[<$state Inner>] as $crate::stm::Command>::parse(input) + .map(|cmd| self.inner.do_apply(cmd)) + } } - fn drop_first(self: Box<Self>) {} - fn drop_second(self: Box<Self>) {} + impl $crate::stm::Command for [<$state Inner>] { + type Cmd = [<$state Command>]; + type Input = ::iggy_common::message::Message<::iggy_common::header::PrepareHeader>; + + fn parse(input: &Self::Input) -> Option<Self::Cmd> { + use ::iggy_common::BytesSerializable; + use ::iggy_common::header::Operation; + + let body = input.body_bytes(); + match input.header().operation { + $( + Operation::$operation => { + Some([<$state Command>]::$operation( + $operation::from_bytes(body).unwrap() + )) + }, + )* + _ => None, + } + } + } } - */ }; } @@ -290,6 +269,3 @@ macro_rules! impl_absorb { } }; } - -/* -*/ diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index f301b511a..4e21a70c4 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -16,7 +16,7 @@ // under the License. use crate::stats::{StreamStats, TopicStats}; -use crate::stm::{Handler, LeftRight}; +use crate::stm::Handler; use crate::{define_state, impl_absorb}; use ahash::AHashMap; use iggy_common::create_stream::CreateStream; @@ -24,14 +24,9 @@ use iggy_common::delete_stream::DeleteStream; use iggy_common::purge_stream::PurgeStream; use iggy_common::update_stream::UpdateStream; use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; -use left_right::Absorb; use slab::Slab; use std::sync::Arc; -// ============================================================================ -// Partition Entity -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct Partition { pub id: usize, @@ -43,10 +38,6 @@ impl Partition { } } -// ============================================================================ -// Partitions Collection -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct Partitions { pub items: Slab<Partition>, @@ -58,67 +49,6 @@ impl Partitions { } } -// ============================================================================ -// ConsumerGroup (local to Topic, not a state machine) -// ============================================================================ - -#[derive(Debug, Clone, Default)] -pub struct ConsumerGroup { - pub id: usize, - pub name: String, - pub created_at: IggyTimestamp, -} - -impl ConsumerGroup { - pub fn new(name: String, created_at: IggyTimestamp) -> Self { - Self { - id: 0, - name, - created_at, - } - } -} - -// ============================================================================ -// ConsumerGroups (local to Topic, simple collection - no left_right) -// ============================================================================ - -#[derive(Debug, Clone, Default)] -pub struct ConsumerGroups { - index: AHashMap<String, usize>, - items: Slab<ConsumerGroup>, -} - -impl ConsumerGroups { - pub fn new() -> Self { - Self::default() - } - - pub fn insert(&self, _group: ConsumerGroup) -> usize { - 0 - } - - pub fn get(&self, _id: usize) -> Option<ConsumerGroup> { - None - } - - pub fn get_by_name(&self, _name: &str) -> Option<ConsumerGroup> { - None - } - - pub fn remove(&self, _id: usize) -> Option<ConsumerGroup> { - None - } - - pub fn len(&self) -> usize { - 0 - } - - pub fn is_empty(&self) -> bool { - true - } -} - // ============================================================================ // Topic Entity // ============================================================================ @@ -135,7 +65,6 @@ pub struct Topic { pub stats: Arc<TopicStats>, pub partitions: Partitions, - pub consumer_groups: ConsumerGroups, } impl Default for Topic { @@ -150,7 +79,6 @@ impl Default for Topic { max_topic_size: MaxTopicSize::default(), stats: Arc::new(TopicStats::default()), partitions: Partitions::new(), - consumer_groups: ConsumerGroups::new(), } } } @@ -175,15 +103,10 @@ impl Topic { max_topic_size, stats: Arc::new(TopicStats::new(stream_stats)), partitions: Partitions::new(), - consumer_groups: ConsumerGroups::new(), } } } -// ============================================================================ -// Topics Collection -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct Topics { pub index: AHashMap<String, usize>, @@ -196,10 +119,6 @@ impl Topics { } } -// ============================================================================ -// Stream Entity -// ============================================================================ - #[derive(Debug)] pub struct Stream { pub id: usize, @@ -246,23 +165,11 @@ impl Stream { } } -fn foo() { - let streams_inner = StreamsInner { - index: AHashMap::new(), - items: Slab::new(), - }; - - let streams: LeftRight<StreamsInner, StreamsCommand> = streams_inner.into(); - let streams_2: Streams<LeftRight<StreamsInner, StreamsCommand>> = streams.into(); -} - define_state! { - Streams, - StreamsInner { + Streams { index: AHashMap<String, usize>, items: Slab<Stream>, }, - StreamsCommand, [CreateStream, UpdateStream, DeleteStream, PurgeStream] } impl_absorb!(StreamsInner, StreamsCommand); diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs index 64285b22d..0df979a59 100644 --- a/core/metadata/src/stm/user.rs +++ b/core/metadata/src/stm/user.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::define_state; use crate::permissioner::Permissioner; -use crate::stm::{Handle, Handler}; +use crate::stm::Handler; +use crate::{define_state, impl_absorb}; use ahash::AHashMap; use iggy_common::change_password::ChangePassword; use iggy_common::create_user::CreateUser; @@ -27,10 +27,6 @@ use iggy_common::update_user::UpdateUser; use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, UserStatus}; use slab::Slab; -// ============================================================================ -// User Entity -// ============================================================================ - #[derive(Debug, Clone, Default)] pub struct User { pub id: UserId, @@ -63,15 +59,14 @@ impl User { } define_state! { - Users, - UsersInner { + Users { index: AHashMap<String, usize>, items: Slab<User>, permissioner: Permissioner, }, - UsersCommand, [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions] } +impl_absorb!(UsersInner, UsersCommand); impl Handler for UsersInner { fn handle(&mut self, cmd: &UsersCommand) {
