This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 35cd19f5f443a85aefdcaa2f7d783b7af03b8ded
Author: numinex <[email protected]>
AuthorDate: Mon Jan 19 12:50:54 2026 +0100

    advance
---
 Cargo.lock                              |   1 +
 core/metadata/Cargo.toml                |   1 +
 core/metadata/src/stm/consumer_group.rs |  41 ++++----
 core/metadata/src/stm/mod.rs            | 168 ++++++++++++++------------------
 core/metadata/src/stm/stream.rs         |  97 +-----------------
 core/metadata/src/stm/user.rs           |  13 +--
 6 files changed, 102 insertions(+), 219 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c7a1a3d7b..c67ddb2bf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5704,6 +5704,7 @@ dependencies = [
  "journal",
  "left-right",
  "message_bus",
+ "paste",
  "slab",
  "tracing",
 ]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index d7fcecfc6..16886a3fc 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -35,5 +35,6 @@ iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 left-right = { workspace = true }
 message_bus = { path = "../message_bus" }
+paste = "1.0"
 slab = "0.4.11"
 tracing = { workspace = true }
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 7eb5d08cb..08d909235 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,14 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
 use iggy_common::IggyTimestamp;
+use iggy_common::create_consumer_group::CreateConsumerGroup;
+use iggy_common::delete_consumer_group::DeleteConsumerGroup;
 use slab::Slab;
 
-// ============================================================================
-// ConsumerGroupMember - Individual member of a consumer group
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct ConsumerGroupMember {
     pub id: u32,
@@ -35,10 +35,6 @@ impl ConsumerGroupMember {
     }
 }
 
-// ============================================================================
-// ConsumerGroup - A group of consumers
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct ConsumerGroup {
     pub id: usize,
@@ -78,18 +74,25 @@ impl ConsumerGroup {
     }
 }
 
-// ============================================================================
-// ConsumerGroups Collection
-// ============================================================================
-
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
-    pub index: AHashMap<(usize, usize, String), usize>,
-    pub items: Slab<ConsumerGroup>,
+define_state! {
+    ConsumerGroups {
+        ns_index: AHashMap<(usize, usize), Vec<usize>>,
+        name_index: AHashMap<String, usize>,
+        items: Slab<ConsumerGroup>,
+    },
+    [CreateConsumerGroup, DeleteConsumerGroup]
 }
+impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand);
 
-impl ConsumerGroups {
-    pub fn new() -> Self {
-        Self::default()
+impl Handler for ConsumerGroupsInner {
+    fn handle(&mut self, cmd: &ConsumerGroupsCommand) {
+        match cmd {
+            ConsumerGroupsCommand::CreateConsumerGroup(_payload) => {
+                // Actual mutation logic will be implemented later
+            }
+            ConsumerGroupsCommand::DeleteConsumerGroup(_payload) => {
+                // Actual mutation logic will be implemented later
+            }
+        }
     }
 }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 518582bbd..c4c5b1562 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -64,14 +64,6 @@ pub trait Handler: Command {
     fn handle(&mut self, cmd: &Self::Cmd);
 }
 
-/// Storage abstraction: applies commands to inner state.
-pub trait ApplyState {
-    type Inner: Handler;
-    type Output;
-
-    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output;
-}
-
 pub struct LeftRight<T, C>
 where
     T: Absorb<C>,
@@ -93,14 +85,11 @@ where
     }
 }
 
-impl<T> ApplyState for LeftRight<T, <T as Command>::Cmd>
+impl<T> LeftRight<T, <T as Command>::Cmd>
 where
     T: Absorb<<T as Command>::Cmd> + Clone + Handler,
 {
-    type Inner = T;
-    type Output = ();
-
-    fn do_apply(&self, cmd: <Self::Inner as Command>::Cmd) -> Self::Output {
+    pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
         self.write
             .as_ref()
             .expect("no write handle - not the owner shard")
@@ -122,29 +111,27 @@ pub trait StateMachine {
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
 }
 
-/// Generates a state machine with pluggable storage.
+/// Generates a state machine with convention-based storage.
 ///
 /// # Generated items
-/// - `$inner` struct with the specified fields (the data)
-/// - `$command` enum with variants for each operation
-/// - `$state<S: ApplyState<Inner = $inner>>` wrapper struct (storage-agnostic)
-/// - `Command` impl for `$inner` (parsing)
-/// - `Absorb` impl for `$inner` (delegates to `Handler::handle`)
-/// - `State` impl for `$state<S>`
-/// - `From<S>` impl for `$state<S>`
+/// - `{$state}Inner` struct with the specified fields (the data)
+/// - `{$state}Command` enum with variants for each operation
+/// - `$state` wrapper struct (non-generic, contains LeftRight storage)
+/// - `Command` impl for `{$state}Inner` (parsing)
+/// - `State` impl for `$state`
+/// - `From<LeftRight<...>>` impl for `$state`
 ///
 /// # User must implement
-/// - `Handler` for `$inner` (business logic)
+/// - `Handler` for `{$state}Inner` (business logic)
+/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command`
 ///
 /// # Example
 /// ```ignore
 /// define_state! {
-///     Streams,
-///     StreamsInner {
+///     Streams {
 ///         index: AHashMap<String, usize>,
 ///         items: Slab<Stream>,
 ///     },
-///     StreamsCommand,
 ///     [CreateStream, UpdateStream, DeleteStream]
 /// }
 ///
@@ -157,97 +144,89 @@ pub trait StateMachine {
 ///         }
 ///     }
 /// }
+///
+/// // User implements Absorb via macro:
+/// impl_absorb!(StreamsInner, StreamsCommand);
 /// ```
+// TODO: The `operation` argument can be removed, once we create an trait for 
mapping.
 #[macro_export]
 macro_rules! define_state {
     (
-        $state:ident,
-        $inner:ident {
+        $state:ident {
             $($field_name:ident : $field_type:ty),* $(,)?
         },
-        $command:ident,
         [$($operation:ident),* $(,)?]
     ) => {
-        #[derive(Debug, Clone, Default)]
-        pub struct $inner {
-            $(
-                pub $field_name: $field_type,
-            )*
-        }
-
-        #[derive(Debug, Clone)]
-        pub enum $command {
-            $(
-                $operation($operation),
-            )*
-        }
-
-        pub struct $state<S: $crate::stm::ApplyState<Inner = $inner>> {
-            inner: S,
-        }
+        paste::paste! {
+            #[derive(Debug, Clone, Default)]
+            pub struct [<$state Inner>] {
+                $(
+                    pub $field_name: $field_type,
+                )*
+            }
 
-        impl<S: $crate::stm::ApplyState<Inner = $inner>> From<S> for $state<S> 
{
-            fn from(storage: S) -> Self {
-                Self { inner: storage }
+            impl [<$state Inner>] {
+                pub fn new() -> Self {
+                    Self::default()
+                }
             }
-        }
 
-        impl<S> $crate::stm::State for $state<S>
-        where
-            S: $crate::stm::ApplyState<Inner = $inner>,
-        {
-            type Input = <$inner as $crate::stm::Command>::Input;
-            type Output = S::Output;
+            #[derive(Debug, Clone)]
+            pub enum [<$state Command>] {
+                $(
+                    $operation($operation),
+                )*
+            }
 
-            fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-                <$inner as $crate::stm::Command>::parse(input)
-                    .map(|cmd| self.inner.do_apply(cmd))
+            pub struct $state {
+                inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
             }
-        }
 
-        impl $crate::stm::Command for $inner {
-            type Cmd = $command;
-            type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
-
-            fn parse(input: &Self::Input) -> Option<Self::Cmd> {
-                use ::iggy_common::BytesSerializable;
-                use ::iggy_common::header::Operation;
-
-                let body = input.body_bytes();
-                match input.header().operation {
-                    $(
-                        Operation::$operation => {
-                            Some($command::$operation(
-                                $operation::from_bytes(body).unwrap()
-                            ))
-                        },
-                    )*
-                    _ => None,
+            impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>> for $state {
+                fn from(storage: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]>) -> Self {
+                    Self { inner: storage }
                 }
             }
-        }
 
-        /*
-        impl ::left_right::Absorb<$command> for $inner
-        where
-            $inner: $crate::stm::Handler,
-        {
-            fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) {
-                <Self as $crate::stm::Handler>::handle(self, cmd);
+            impl From<[<$state Inner>]> for $state {
+                fn from(inner: [<$state Inner>]) -> Self {
+                    let left_right: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]> = inner.into();
+                    left_right.into()
+                }
             }
 
-            fn absorb_second(&mut self, cmd: $command, _other: &Self) {
-                <Self as $crate::stm::Handler>::handle(self, &cmd);
-            }
+            impl $crate::stm::State for $state {
+                type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
+                type Output = ();
 
-            fn sync_with(&mut self, first: &Self) {
-                *self = first.clone();
+                fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+                    <[<$state Inner>] as $crate::stm::Command>::parse(input)
+                        .map(|cmd| self.inner.do_apply(cmd))
+                }
             }
 
-            fn drop_first(self: Box<Self>) {}
-            fn drop_second(self: Box<Self>) {}
+            impl $crate::stm::Command for [<$state Inner>] {
+                type Cmd = [<$state Command>];
+                type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+
+                fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+                    use ::iggy_common::BytesSerializable;
+                    use ::iggy_common::header::Operation;
+
+                    let body = input.body_bytes();
+                    match input.header().operation {
+                        $(
+                            Operation::$operation => {
+                                Some([<$state Command>]::$operation(
+                                    $operation::from_bytes(body).unwrap()
+                                ))
+                            },
+                        )*
+                        _ => None,
+                    }
+                }
+            }
         }
-        */
     };
 }
 
@@ -290,6 +269,3 @@ macro_rules! impl_absorb {
         }
     };
 }
-
-/*
-*/
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index f301b511a..4e21a70c4 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::{Handler, LeftRight};
+use crate::stm::Handler;
 use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
 use iggy_common::create_stream::CreateStream;
@@ -24,14 +24,9 @@ use iggy_common::delete_stream::DeleteStream;
 use iggy_common::purge_stream::PurgeStream;
 use iggy_common::update_stream::UpdateStream;
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
-use left_right::Absorb;
 use slab::Slab;
 use std::sync::Arc;
 
-// ============================================================================
-// Partition Entity
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct Partition {
     pub id: usize,
@@ -43,10 +38,6 @@ impl Partition {
     }
 }
 
-// ============================================================================
-// Partitions Collection
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct Partitions {
     pub items: Slab<Partition>,
@@ -58,67 +49,6 @@ impl Partitions {
     }
 }
 
-// ============================================================================
-// ConsumerGroup (local to Topic, not a state machine)
-// ============================================================================
-
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroup {
-    pub id: usize,
-    pub name: String,
-    pub created_at: IggyTimestamp,
-}
-
-impl ConsumerGroup {
-    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
-        Self {
-            id: 0,
-            name,
-            created_at,
-        }
-    }
-}
-
-// ============================================================================
-// ConsumerGroups (local to Topic, simple collection - no left_right)
-// ============================================================================
-
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
-    index: AHashMap<String, usize>,
-    items: Slab<ConsumerGroup>,
-}
-
-impl ConsumerGroups {
-    pub fn new() -> Self {
-        Self::default()
-    }
-
-    pub fn insert(&self, _group: ConsumerGroup) -> usize {
-        0
-    }
-
-    pub fn get(&self, _id: usize) -> Option<ConsumerGroup> {
-        None
-    }
-
-    pub fn get_by_name(&self, _name: &str) -> Option<ConsumerGroup> {
-        None
-    }
-
-    pub fn remove(&self, _id: usize) -> Option<ConsumerGroup> {
-        None
-    }
-
-    pub fn len(&self) -> usize {
-        0
-    }
-
-    pub fn is_empty(&self) -> bool {
-        true
-    }
-}
-
 // ============================================================================
 // Topic Entity
 // ============================================================================
@@ -135,7 +65,6 @@ pub struct Topic {
 
     pub stats: Arc<TopicStats>,
     pub partitions: Partitions,
-    pub consumer_groups: ConsumerGroups,
 }
 
 impl Default for Topic {
@@ -150,7 +79,6 @@ impl Default for Topic {
             max_topic_size: MaxTopicSize::default(),
             stats: Arc::new(TopicStats::default()),
             partitions: Partitions::new(),
-            consumer_groups: ConsumerGroups::new(),
         }
     }
 }
@@ -175,15 +103,10 @@ impl Topic {
             max_topic_size,
             stats: Arc::new(TopicStats::new(stream_stats)),
             partitions: Partitions::new(),
-            consumer_groups: ConsumerGroups::new(),
         }
     }
 }
 
-// ============================================================================
-// Topics Collection
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct Topics {
     pub index: AHashMap<String, usize>,
@@ -196,10 +119,6 @@ impl Topics {
     }
 }
 
-// ============================================================================
-// Stream Entity
-// ============================================================================
-
 #[derive(Debug)]
 pub struct Stream {
     pub id: usize,
@@ -246,23 +165,11 @@ impl Stream {
     }
 }
 
-fn foo() {
-    let streams_inner = StreamsInner {
-        index: AHashMap::new(),
-        items: Slab::new(),
-    };
-
-    let streams: LeftRight<StreamsInner, StreamsCommand> = 
streams_inner.into();
-    let streams_2: Streams<LeftRight<StreamsInner, StreamsCommand>> = 
streams.into();
-}
-
 define_state! {
-    Streams,
-    StreamsInner {
+    Streams {
         index: AHashMap<String, usize>,
         items: Slab<Stream>,
     },
-    StreamsCommand,
     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
 }
 impl_absorb!(StreamsInner, StreamsCommand);
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 64285b22d..0df979a59 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::define_state;
 use crate::permissioner::Permissioner;
-use crate::stm::{Handle, Handler};
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_user::CreateUser;
@@ -27,10 +27,6 @@ use iggy_common::update_user::UpdateUser;
 use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, 
UserStatus};
 use slab::Slab;
 
-// ============================================================================
-// User Entity
-// ============================================================================
-
 #[derive(Debug, Clone, Default)]
 pub struct User {
     pub id: UserId,
@@ -63,15 +59,14 @@ impl User {
 }
 
 define_state! {
-    Users,
-    UsersInner {
+    Users {
         index: AHashMap<String, usize>,
         items: Slab<User>,
         permissioner: Permissioner,
     },
-    UsersCommand,
     [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]
 }
+impl_absorb!(UsersInner, UsersCommand);
 
 impl Handler for UsersInner {
     fn handle(&mut self, cmd: &UsersCommand) {

Reply via email to