This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch invert-state-handler-dep
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3e8c0984f2de5129a9c622ff16f6ab6ac17aa85f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Feb 9 13:46:15 2026 +0100

    refactor(metadata): invert state handler dependency with per-command 
dispatch
    
    Split define_state! into define_state! (struct + wrapper) and
    collect_handlers! (enum, Command, dispatch, Absorb). Each command
    type now implements StateHandler<S> for the state it mutates,
    moving handler logic from a central match block onto the command.
    
    Removes Handler trait and impl_absorb! macro. All 19 handlers
    migrated (10 stream, 7 user, 2 consumer_group).
---
 core/metadata/src/stm/consumer_group.rs | 136 +++++------
 core/metadata/src/stm/mod.rs            | 158 ++++++-------
 core/metadata/src/stm/stream.rs         | 402 +++++++++++++++++---------------
 core/metadata/src/stm/user.rs           | 246 ++++++++++---------
 4 files changed, 484 insertions(+), 458 deletions(-)

diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 3a3e947fc..c4713e58a 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::Handler;
-use crate::{define_state, impl_absorb};
+use crate::stm::StateHandler;
+use crate::{collect_handlers, define_state};
 use ahash::AHashMap;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
@@ -94,10 +94,15 @@ define_state! {
         topic_index: AHashMap<(usize, usize), Vec<usize>>,
         topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>>,
         items: Slab<ConsumerGroup>,
-    },
-    [CreateConsumerGroup, DeleteConsumerGroup]
+    }
+}
+
+collect_handlers! {
+    ConsumerGroups {
+        CreateConsumerGroup,
+        DeleteConsumerGroup,
+    }
 }
-impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand);
 
 impl ConsumerGroupsInner {
     fn resolve_consumer_group_id_by_identifiers(
@@ -106,7 +111,6 @@ impl ConsumerGroupsInner {
         topic_id: &Identifier,
         group_id: &Identifier,
     ) -> Option<usize> {
-        // Resolve by numeric IDs
         if let (Ok(s), Ok(t)) = (stream_id.get_u32_value(), 
topic_id.get_u32_value()) {
             let groups_in_topic = self.topic_index.get(&(s as usize, t as 
usize))?;
 
@@ -129,7 +133,6 @@ impl ConsumerGroupsInner {
             };
         }
 
-        // Resolve by string names
         if let (Ok(s), Ok(t)) = (stream_id.get_string_value(), 
topic_id.get_string_value()) {
             let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
             let groups_in_topic = self.topic_name_index.get(&key)?;
@@ -157,70 +160,71 @@ impl ConsumerGroupsInner {
     }
 }
 
-impl Handler for ConsumerGroupsInner {
-    fn handle(&mut self, cmd: &Self::Cmd) {
-        // TODO: This is all an hack, we need to figure out how to do this, in 
a way where `Identifier` does not reach
-        // this stage of execution.
-        match cmd {
-            ConsumerGroupsCommand::CreateConsumerGroup(payload) => {
-                let name: Arc<str> = Arc::from(payload.name.as_str());
-                if self.name_index.contains_key(&name) {
-                    return;
-                }
+// TODO: This is all a hack, we need to figure out how to do this in a way 
where `Identifier`
+// does not reach this stage of execution.
 
-                let group = ConsumerGroup::new(name.clone());
-                let id = self.items.insert(group);
-                self.items[id].id = id;
+impl StateHandler<ConsumerGroupsInner> for CreateConsumerGroup {
+    fn apply(&self, state: &mut ConsumerGroupsInner) {
+        let name: Arc<str> = Arc::from(self.name.as_str());
+        if state.name_index.contains_key(&name) {
+            return;
+        }
 
-                self.name_index.insert(name.clone(), id);
+        let group = ConsumerGroup::new(name.clone());
+        let id = state.items.insert(group);
+        state.items[id].id = id;
+
+        state.name_index.insert(name.clone(), id);
+
+        if let (Ok(s), Ok(t)) = (
+            self.stream_id.get_u32_value(),
+            self.topic_id.get_u32_value(),
+        ) {
+            state
+                .topic_index
+                .entry((s as usize, t as usize))
+                .or_default()
+                .push(id);
+        }
 
-                if let (Ok(s), Ok(t)) = (
-                    payload.stream_id.get_u32_value(),
-                    payload.topic_id.get_u32_value(),
-                ) {
-                    self.topic_index
-                        .entry((s as usize, t as usize))
-                        .or_default()
-                        .push(id);
-                }
+        if let (Ok(s), Ok(t)) = (
+            self.stream_id.get_string_value(),
+            self.topic_id.get_string_value(),
+        ) {
+            let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+            state.topic_name_index.entry(key).or_default().push(id);
+        }
+    }
+}
 
-                if let (Ok(s), Ok(t)) = (
-                    payload.stream_id.get_string_value(),
-                    payload.topic_id.get_string_value(),
-                ) {
-                    let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
-                    self.topic_name_index.entry(key).or_default().push(id);
-                }
-            }
+impl StateHandler<ConsumerGroupsInner> for DeleteConsumerGroup {
+    fn apply(&self, state: &mut ConsumerGroupsInner) {
+        let Some(id) = state.resolve_consumer_group_id_by_identifiers(
+            &self.stream_id,
+            &self.topic_id,
+            &self.group_id,
+        ) else {
+            return;
+        };
 
-            ConsumerGroupsCommand::DeleteConsumerGroup(payload) => {
-                if let Some(id) = 
self.resolve_consumer_group_id_by_identifiers(
-                    &payload.stream_id,
-                    &payload.topic_id,
-                    &payload.group_id,
-                ) {
-                    let group = self.items.remove(id);
-
-                    self.name_index.remove(&group.name);
-
-                    if let (Ok(s), Ok(t)) = (
-                        payload.stream_id.get_u32_value(),
-                        payload.topic_id.get_u32_value(),
-                    ) && let Some(vec) = self.topic_index.get_mut(&(s as 
usize, t as usize))
-                    {
-                        vec.retain(|&x| x != id);
-                    }
-
-                    if let (Ok(s), Ok(t)) = (
-                        payload.stream_id.get_string_value(),
-                        payload.topic_id.get_string_value(),
-                    ) {
-                        let key = (Arc::from(s.as_str()), 
Arc::from(t.as_str()));
-                        if let Some(vec) = self.topic_name_index.get_mut(&key) 
{
-                            vec.retain(|&x| x != id);
-                        }
-                    }
-                }
+        let group = state.items.remove(id);
+        state.name_index.remove(&group.name);
+
+        if let (Ok(s), Ok(t)) = (
+            self.stream_id.get_u32_value(),
+            self.topic_id.get_u32_value(),
+        ) && let Some(vec) = state.topic_index.get_mut(&(s as usize, t as 
usize))
+        {
+            vec.retain(|&x| x != id);
+        }
+
+        if let (Ok(s), Ok(t)) = (
+            self.stream_id.get_string_value(),
+            self.topic_id.get_string_value(),
+        ) {
+            let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+            if let Some(vec) = state.topic_name_index.get_mut(&key) {
+                vec.retain(|&x| x != id);
             }
         }
     }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index c92b0ca10..802da6de5 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -70,9 +70,11 @@ pub trait Command {
     fn parse(input: Self::Input) -> Result<Self::Cmd, Self::Input>;
 }
 
-/// Handles commands. User-implemented business logic.
-pub trait Handler: Command {
-    fn handle(&mut self, cmd: &Self::Cmd);
+/// Per-command handler for a given state type.
+/// Each command struct implements this for the state it mutates,
+/// replacing the monolithic `Handler::handle()` match block.
+pub trait StateHandler<S> {
+    fn apply(&self, state: &mut S);
 }
 
 #[derive(Debug)]
@@ -100,7 +102,7 @@ where
 
 impl<T> LeftRight<T, <T as Command>::Cmd>
 where
-    T: Absorb<<T as Command>::Cmd> + Clone + Handler,
+    T: Absorb<<T as Command>::Cmd> + Clone + Command,
 {
     pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
         self.write
@@ -125,51 +127,23 @@ pub trait StateMachine {
     fn update(&self, input: Self::Input) -> Self::Output;
 }
 
-/// Generates a state machine with convention-based storage.
+/// Generates the state's inner struct and wrapper type.
 ///
 /// # Generated items
 /// - `{$state}Inner` struct with the specified fields (the data)
-/// - `{$state}Command` enum with variants for each operation
-/// - `$state` wrapper struct (non-generic, contains LeftRight storage)
-/// - `Command` impl for `{$state}Inner` (parsing)
-/// - `State` impl for `$state`
+/// - `$state` wrapper struct (contains LeftRight storage)
 /// - `From<LeftRight<...>>` impl for `$state`
+/// - `From<{$state}Inner>` impl for `$state`
 ///
-/// # User must implement
-/// - `Handler` for `{$state}Inner` (business logic)
-/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command`
-///
-/// # Example
-/// ```ignore
-/// define_state! {
-///     Streams {
-///         index: AHashMap<String, usize>,
-///         items: Slab<Stream>,
-///     },
-///     [CreateStream, UpdateStream, DeleteStream]
-/// }
-///
-/// // User implements Handler manually:
-/// impl Handler for StreamsInner {
-///     fn handle(&mut self, cmd: &StreamsCommand) {
-///         match cmd {
-///             StreamsCommand::CreateStream(payload) => { /* ... */ }
-///             // ...
-///         }
-///     }
-/// }
-///
-/// // User implements Absorb via macro:
-/// impl_absorb!(StreamsInner, StreamsCommand);
-/// ```
-// TODO: The `operation` argument can be removed, once we create an trait for 
mapping.
+/// The command enum, parsing, dispatch, and Absorb impl are generated
+/// by `collect_handlers!` separately, keeping state definition decoupled
+/// from the set of operations.
 #[macro_export]
 macro_rules! define_state {
     (
         $state:ident {
             $($field_name:ident : $field_type:ty),* $(,)?
-        },
-        [$($operation:ident),* $(,)?]
+        }
     ) => {
         paste::paste! {
             #[derive(Debug, Clone, Default)]
@@ -185,13 +159,6 @@ macro_rules! define_state {
                 }
             }
 
-            #[derive(Debug, Clone)]
-            pub enum [<$state Command>] {
-                $(
-                    $operation($operation),
-                )*
-            }
-
             #[derive(Debug)]
             pub struct $state {
                 inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
@@ -209,16 +176,34 @@ macro_rules! define_state {
                     left_right.into()
                 }
             }
+        }
+    };
+}
 
-            impl $crate::stm::State for $state {
-                type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
-                type Output = ();
-
-                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
-                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
-                    self.inner.do_apply(cmd);
-                    Ok(())
-                }
+/// Generates the command enum, parsing, dispatch, State, and Absorb for a 
state type.
+///
+/// # Generated items
+/// - `{$state}Command` enum with one variant per operation
+/// - `Command` impl for `{$state}Inner` (parses `Message<PrepareHeader>`)
+/// - `{$state}Inner::dispatch()` method (routes each variant to 
`StateHandler::apply()`)
+/// - `State` impl for `$state` wrapper
+/// - `Absorb<{$state}Command>` impl for `{$state}Inner`
+///
+/// # Requirements
+/// Each listed operation type must implement `StateHandler<{$state}Inner>`.
+#[macro_export]
+macro_rules! collect_handlers {
+    (
+        $state:ident {
+            $($operation:ident),* $(,)?
+        }
+    ) => {
+        paste::paste! {
+            #[derive(Debug, Clone)]
+            pub enum [<$state Command>] {
+                $(
+                    $operation($operation),
+                )*
             }
 
             impl $crate::stm::Command for [<$state Inner>] {
@@ -242,45 +227,42 @@ macro_rules! define_state {
                     }
                 }
             }
-        }
-    };
-}
-
-// This macro is really sad, but we can't do blanket impl from below, due to 
orphan rule.
-// impl<T> Absorb<T::Cmd> for T
-// where
-//     T: Handler + Clone,
-// {
-//     fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) {
-//         self.handle(cmd);
 
-//     }
-
-//     fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) {
-//         self.handle(&cmd);
-//     }
+            impl [<$state Inner>] {
+                fn dispatch(&mut self, cmd: &[<$state Command>]) {
+                    match cmd {
+                        $(
+                            [<$state Command>]::$operation(payload) => {
+                                $crate::stm::StateHandler::apply(payload, 
self);
+                            },
+                        )*
+                    }
+                }
+            }
 
-//     fn sync_with(&mut self, first: &Self) {
-//         *self = first.clone();
-//     }
+            impl $crate::stm::State for $state {
+                type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
+                type Output = ();
 
-//     fn drop_first(self: Box<Self>) {}
-//     fn drop_second(self: Box<Self>) {}
-// }
-#[macro_export]
-macro_rules! impl_absorb {
-    ($inner:ident, $cmd:ident) => {
-        impl left_right::Absorb<$cmd> for $inner {
-            fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) {
-                self.handle(cmd);
+                fn apply(&self, input: Self::Input) -> Result<Self::Output, 
Self::Input> {
+                    let cmd = <[<$state Inner>] as 
$crate::stm::Command>::parse(input)?;
+                    self.inner.do_apply(cmd);
+                    Ok(())
+                }
             }
 
-            fn absorb_second(&mut self, cmd: $cmd, _other: &Self) {
-                self.handle(&cmd);
-            }
+            impl left_right::Absorb<[<$state Command>]> for [<$state Inner>] {
+                fn absorb_first(&mut self, cmd: &mut [<$state Command>], 
_other: &Self) {
+                    self.dispatch(cmd);
+                }
+
+                fn absorb_second(&mut self, cmd: [<$state Command>], _other: 
&Self) {
+                    self.dispatch(&cmd);
+                }
 
-            fn sync_with(&mut self, first: &Self) {
-                *self = first.clone();
+                fn sync_with(&mut self, first: &Self) {
+                    *self = first.clone();
+                }
             }
         }
     };
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index d1a3da212..d78728228 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::Handler;
-use crate::{define_state, impl_absorb};
+use crate::stm::StateHandler;
+use crate::{collect_handlers, define_state};
 use ahash::AHashMap;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::create_stream::CreateStream;
@@ -168,8 +168,11 @@ define_state! {
     Streams {
         index: AHashMap<Arc<str>, usize>,
         items: Slab<Stream>,
-    },
-    [
+    }
+}
+
+collect_handlers! {
+    Streams {
         CreateStream,
         UpdateStream,
         DeleteStream,
@@ -179,12 +182,10 @@ define_state! {
         DeleteTopic,
         PurgeTopic,
         CreatePartitions,
-        DeletePartitions
-    ]
+        DeletePartitions,
+    }
 }
 
-impl_absorb!(StreamsInner, StreamsCommand);
-
 impl StreamsInner {
     fn resolve_stream_id(&self, identifier: &iggy_common::Identifier) -> 
Option<usize> {
         use iggy_common::IdKind;
@@ -229,206 +230,229 @@ impl StreamsInner {
     }
 }
 
-impl Handler for StreamsInner {
-    fn handle(&mut self, cmd: &StreamsCommand) {
-        match cmd {
-            StreamsCommand::CreateStream(payload) => {
-                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                if self.index.contains_key(&name_arc) {
-                    return;
-                }
+impl StateHandler<StreamsInner> for CreateStream {
+    fn apply(&self, state: &mut StreamsInner) {
+        let name_arc: Arc<str> = Arc::from(self.name.as_str());
+        if state.index.contains_key(&name_arc) {
+            return;
+        }
 
-                let stream = Stream {
-                    id: 0,
-                    name: name_arc.clone(),
-                    created_at: iggy_common::IggyTimestamp::now(),
-                    stats: Arc::new(StreamStats::default()),
-                    topics: Slab::new(),
-                    topic_index: AHashMap::default(),
-                };
+        let stream = Stream {
+            id: 0,
+            name: name_arc.clone(),
+            created_at: iggy_common::IggyTimestamp::now(),
+            stats: Arc::new(StreamStats::default()),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
+        };
 
-                let id = self.items.insert(stream);
-                if let Some(stream) = self.items.get_mut(id) {
-                    stream.id = id;
-                }
-                self.index.insert(name_arc, id);
-            }
-            StreamsCommand::UpdateStream(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
+        let id = state.items.insert(stream);
+        if let Some(stream) = state.items.get_mut(id) {
+            stream.id = id;
+        }
+        state.index.insert(name_arc, id);
+    }
+}
 
-                let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                if let Some(&existing_id) = self.index.get(&new_name_arc)
-                    && existing_id != stream_id
-                {
-                    return;
-                }
+impl StateHandler<StreamsInner> for UpdateStream {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+
+        let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
+        if let Some(&existing_id) = state.index.get(&new_name_arc)
+            && existing_id != stream_id
+        {
+            return;
+        }
 
-                self.index.remove(&stream.name);
-                stream.name = new_name_arc.clone();
-                self.index.insert(new_name_arc, stream_id);
-            }
-            StreamsCommand::DeleteStream(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
+        state.index.remove(&stream.name);
+        stream.name = new_name_arc.clone();
+        state.index.insert(new_name_arc, stream_id);
+    }
+}
 
-                if let Some(stream) = self.items.get(stream_id) {
-                    let name = stream.name.clone();
+impl StateHandler<StreamsInner> for DeleteStream {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
 
-                    self.items.remove(stream_id);
-                    self.index.remove(&name);
-                }
-            }
-            StreamsCommand::PurgeStream(_payload) => {
-                // TODO
-                todo!();
-            }
-            StreamsCommand::CreateTopic(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
+        if let Some(stream) = state.items.get(stream_id) {
+            let name = stream.name.clone();
 
-                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                if stream.topic_index.contains_key(&name_arc) {
-                    return;
-                }
+            state.items.remove(stream_id);
+            state.index.remove(&name);
+        }
+    }
+}
 
-                let topic = Topic {
-                    id: 0, // Will be assigned by slab
-                    name: name_arc.clone(),
-                    created_at: iggy_common::IggyTimestamp::now(),
-                    replication_factor: 
payload.replication_factor.unwrap_or(1),
-                    message_expiry: payload.message_expiry,
-                    compression_algorithm: payload.compression_algorithm,
-                    max_topic_size: payload.max_topic_size,
-                    stats: Arc::new(TopicStats::new(stream.stats.clone())),
-                    partitions: Vec::new(),
-                    round_robin_counter: Arc::new(AtomicUsize::new(0)),
-                };
+impl StateHandler<StreamsInner> for PurgeStream {
+    fn apply(&self, _state: &mut StreamsInner) {
+        // TODO
+        todo!();
+    }
+}
 
-                let topic_id = stream.topics.insert(topic);
-                if let Some(topic) = stream.topics.get_mut(topic_id) {
-                    topic.id = topic_id;
-
-                    for partition_id in 0..payload.partitions_count as usize {
-                        let partition = Partition {
-                            id: partition_id,
-                            created_at: iggy_common::IggyTimestamp::now(),
-                        };
-                        topic.partitions.push(partition);
-                    }
-                }
+impl StateHandler<StreamsInner> for CreateTopic {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+
+        let name_arc: Arc<str> = Arc::from(self.name.as_str());
+        if stream.topic_index.contains_key(&name_arc) {
+            return;
+        }
 
-                stream.topic_index.insert(name_arc, topic_id);
-            }
-            StreamsCommand::UpdateTopic(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
-                    return;
-                };
+        let topic = Topic {
+            id: 0,
+            name: name_arc.clone(),
+            created_at: iggy_common::IggyTimestamp::now(),
+            replication_factor: self.replication_factor.unwrap_or(1),
+            message_expiry: self.message_expiry,
+            compression_algorithm: self.compression_algorithm,
+            max_topic_size: self.max_topic_size,
+            stats: Arc::new(TopicStats::new(stream.stats.clone())),
+            partitions: Vec::new(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
+        };
 
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
-                let Some(topic) = stream.topics.get_mut(topic_id) else {
-                    return;
+        let topic_id = stream.topics.insert(topic);
+        if let Some(topic) = stream.topics.get_mut(topic_id) {
+            topic.id = topic_id;
+
+            for partition_id in 0..self.partitions_count as usize {
+                let partition = Partition {
+                    id: partition_id,
+                    created_at: iggy_common::IggyTimestamp::now(),
                 };
+                topic.partitions.push(partition);
+            }
+        }
 
-                let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                if let Some(&existing_id) = 
stream.topic_index.get(&new_name_arc)
-                    && existing_id != topic_id
-                {
-                    return;
-                }
+        stream.topic_index.insert(name_arc, topic_id);
+    }
+}
 
-                stream.topic_index.remove(&topic.name);
-                topic.name = new_name_arc.clone();
-                topic.compression_algorithm = payload.compression_algorithm;
-                topic.message_expiry = payload.message_expiry;
-                topic.max_topic_size = payload.max_topic_size;
-                if let Some(rf) = payload.replication_factor {
-                    topic.replication_factor = rf;
-                }
-                stream.topic_index.insert(new_name_arc, topic_id);
-            }
-            StreamsCommand::DeleteTopic(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
-                    return;
-                };
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
+impl StateHandler<StreamsInner> for UpdateTopic {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+            return;
+        };
+
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+        let Some(topic) = stream.topics.get_mut(topic_id) else {
+            return;
+        };
+
+        let new_name_arc: Arc<str> = Arc::from(self.name.as_str());
+        if let Some(&existing_id) = stream.topic_index.get(&new_name_arc)
+            && existing_id != topic_id
+        {
+            return;
+        }
 
-                if let Some(topic) = stream.topics.get(topic_id) {
-                    let name = topic.name.clone();
-                    stream.topics.remove(topic_id);
-                    stream.topic_index.remove(&name);
-                }
-            }
-            StreamsCommand::PurgeTopic(_payload) => {
-                // TODO:
-                todo!();
-            }
-            StreamsCommand::CreatePartitions(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
-                    return;
-                };
+        stream.topic_index.remove(&topic.name);
+        topic.name = new_name_arc.clone();
+        topic.compression_algorithm = self.compression_algorithm;
+        topic.message_expiry = self.message_expiry;
+        topic.max_topic_size = self.max_topic_size;
+        if let Some(rf) = self.replication_factor {
+            topic.replication_factor = rf;
+        }
+        stream.topic_index.insert(new_name_arc, topic_id);
+    }
+}
 
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
-                let Some(topic) = stream.topics.get_mut(topic_id) else {
-                    return;
-                };
+impl StateHandler<StreamsInner> for DeleteTopic {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+            return;
+        };
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+
+        if let Some(topic) = stream.topics.get(topic_id) {
+            let name = topic.name.clone();
+            stream.topics.remove(topic_id);
+            stream.topic_index.remove(&name);
+        }
+    }
+}
 
-                let current_partition_count = topic.partitions.len();
-                for i in 0..payload.partitions_count as usize {
-                    let partition_id = current_partition_count + i;
-                    let partition = Partition {
-                        id: partition_id,
-                        created_at: iggy_common::IggyTimestamp::now(),
-                    };
-                    topic.partitions.push(partition);
-                }
-            }
-            StreamsCommand::DeletePartitions(payload) => {
-                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
-                    return;
-                };
-                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
-                    return;
-                };
+impl StateHandler<StreamsInner> for PurgeTopic {
+    fn apply(&self, _state: &mut StreamsInner) {
+        // TODO
+        todo!();
+    }
+}
 
-                let Some(stream) = self.items.get_mut(stream_id) else {
-                    return;
-                };
-                let Some(topic) = stream.topics.get_mut(topic_id) else {
-                    return;
-                };
+impl StateHandler<StreamsInner> for CreatePartitions {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+            return;
+        };
+
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+        let Some(topic) = stream.topics.get_mut(topic_id) else {
+            return;
+        };
+
+        let current_partition_count = topic.partitions.len();
+        for i in 0..self.partitions_count as usize {
+            let partition_id = current_partition_count + i;
+            let partition = Partition {
+                id: partition_id,
+                created_at: iggy_common::IggyTimestamp::now(),
+            };
+            topic.partitions.push(partition);
+        }
+    }
+}
 
-                let count_to_delete = payload.partitions_count as usize;
-                if count_to_delete > 0 && count_to_delete <= 
topic.partitions.len() {
-                    topic
-                        .partitions
-                        .truncate(topic.partitions.len() - count_to_delete);
-                }
-            }
+impl StateHandler<StreamsInner> for DeletePartitions {
+    fn apply(&self, state: &mut StreamsInner) {
+        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+            return;
+        };
+        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+            return;
+        };
+
+        let Some(stream) = state.items.get_mut(stream_id) else {
+            return;
+        };
+        let Some(topic) = stream.topics.get_mut(topic_id) else {
+            return;
+        };
+
+        let count_to_delete = self.partitions_count as usize;
+        if count_to_delete > 0 && count_to_delete <= topic.partitions.len() {
+            topic
+                .partitions
+                .truncate(topic.partitions.len() - count_to_delete);
         }
     }
 }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 81ef0b018..909fae432 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::permissioner::Permissioner;
-use crate::stm::Handler;
-use crate::{define_state, impl_absorb};
+use crate::stm::StateHandler;
+use crate::{collect_handlers, define_state};
 use ahash::AHashMap;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
@@ -82,18 +82,20 @@ define_state! {
         items: Slab<User>,
         personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, 
PersonalAccessToken>>,
         permissioner: Permissioner,
-    },
-    [
+    }
+}
+
+collect_handlers! {
+    Users {
         CreateUser,
         UpdateUser,
         DeleteUser,
         ChangePassword,
         UpdatePermissions,
         CreatePersonalAccessToken,
-        DeletePersonalAccessToken
-    ]
+        DeletePersonalAccessToken,
+    }
 }
-impl_absorb!(UsersInner, UsersCommand);
 
 impl UsersInner {
     fn resolve_user_id(&self, identifier: &iggy_common::Identifier) -> 
Option<usize> {
@@ -115,123 +117,137 @@ impl UsersInner {
     }
 }
 
-impl Handler for UsersInner {
-    fn handle(&mut self, cmd: &UsersCommand) {
-        match cmd {
-            UsersCommand::CreateUser(payload) => {
-                let username_arc: Arc<str> = 
Arc::from(payload.username.as_str());
-                if self.index.contains_key(&username_arc) {
-                    return;
-                }
+impl StateHandler<UsersInner> for CreateUser {
+    fn apply(&self, state: &mut UsersInner) {
+        let username_arc: Arc<str> = Arc::from(self.username.as_str());
+        if state.index.contains_key(&username_arc) {
+            return;
+        }
 
-                let user = User {
-                    id: 0,
-                    username: username_arc.clone(),
-                    password_hash: Arc::from(payload.password.as_str()),
-                    status: payload.status,
-                    created_at: iggy_common::IggyTimestamp::now(),
-                    permissions: payload.permissions.as_ref().map(|p| 
Arc::new(p.clone())),
-                };
-
-                let id = self.items.insert(user);
-                if let Some(user) = self.items.get_mut(id) {
-                    user.id = id as UserId;
-                }
+        let user = User {
+            id: 0,
+            username: username_arc.clone(),
+            password_hash: Arc::from(self.password.as_str()),
+            status: self.status,
+            created_at: iggy_common::IggyTimestamp::now(),
+            permissions: self.permissions.as_ref().map(|p| 
Arc::new(p.clone())),
+        };
+
+        let id = state.items.insert(user);
+        if let Some(user) = state.items.get_mut(id) {
+            user.id = id as UserId;
+        }
 
-                self.index.insert(username_arc, id as UserId);
-                self.personal_access_tokens
-                    .insert(id as UserId, AHashMap::default());
-            }
-            UsersCommand::UpdateUser(payload) => {
-                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
-                    return;
-                };
-
-                let Some(user) = self.items.get_mut(user_id) else {
-                    return;
-                };
-
-                if let Some(new_username) = &payload.username {
-                    let new_username_arc: Arc<str> = 
Arc::from(new_username.as_str());
-                    if let Some(&existing_id) = 
self.index.get(&new_username_arc)
-                        && existing_id != user_id as UserId
-                    {
-                        return;
-                    }
-
-                    self.index.remove(&user.username);
-                    user.username = new_username_arc.clone();
-                    self.index.insert(new_username_arc, user_id as UserId);
-                }
+        state.index.insert(username_arc, id as UserId);
+        state
+            .personal_access_tokens
+            .insert(id as UserId, AHashMap::default());
+    }
+}
 
-                if let Some(new_status) = payload.status {
-                    user.status = new_status;
-                }
-            }
-            UsersCommand::DeleteUser(payload) => {
-                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
-                    return;
-                };
-
-                if let Some(user) = self.items.get(user_id) {
-                    let username = user.username.clone();
-                    self.items.remove(user_id);
-                    self.index.remove(&username);
-                    self.personal_access_tokens.remove(&(user_id as UserId));
-                }
+impl StateHandler<UsersInner> for UpdateUser {
+    fn apply(&self, state: &mut UsersInner) {
+        let Some(user_id) = state.resolve_user_id(&self.user_id) else {
+            return;
+        };
+
+        let Some(user) = state.items.get_mut(user_id) else {
+            return;
+        };
+
+        if let Some(new_username) = &self.username {
+            let new_username_arc: Arc<str> = Arc::from(new_username.as_str());
+            if let Some(&existing_id) = state.index.get(&new_username_arc)
+                && existing_id != user_id as UserId
+            {
+                return;
             }
-            UsersCommand::ChangePassword(payload) => {
-                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
-                    return;
-                };
 
-                if let Some(user) = self.items.get_mut(user_id) {
-                    user.password_hash = 
Arc::from(payload.new_password.as_str());
-                }
-            }
-            UsersCommand::UpdatePermissions(payload) => {
-                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
-                    return;
-                };
+            state.index.remove(&user.username);
+            user.username = new_username_arc.clone();
+            state.index.insert(new_username_arc, user_id as UserId);
+        }
 
-                if let Some(user) = self.items.get_mut(user_id) {
-                    user.permissions = payload.permissions.as_ref().map(|p| 
Arc::new(p.clone()));
-                }
-            }
-            UsersCommand::CreatePersonalAccessToken(payload) => {
-                // TODO: Stub untill protocol gets adjusted.
-                let user_id = 0;
-                let user_tokens = 
self.personal_access_tokens.entry(user_id).or_default();
-                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                if user_tokens.contains_key(&name_arc) {
-                    return;
-                }
+        if let Some(new_status) = self.status {
+            user.status = new_status;
+        }
+    }
+}
 
-                let expiry_at =
-                    
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), payload.expiry);
-                if let Some(expiry_at) = expiry_at
-                    && expiry_at.as_micros() <= 
IggyTimestamp::now().as_micros()
-                {
-                    return;
-                }
+impl StateHandler<UsersInner> for DeleteUser {
+    fn apply(&self, state: &mut UsersInner) {
+        let Some(user_id) = state.resolve_user_id(&self.user_id) else {
+            return;
+        };
+
+        if let Some(user) = state.items.get(user_id) {
+            let username = user.username.clone();
+            state.items.remove(user_id);
+            state.index.remove(&username);
+            state.personal_access_tokens.remove(&(user_id as UserId));
+        }
+    }
+}
 
-                let (pat, _) = PersonalAccessToken::new(
-                    user_id,
-                    payload.name.as_ref(),
-                    IggyTimestamp::now(),
-                    payload.expiry,
-                );
-                user_tokens.insert(name_arc, pat);
-            }
-            UsersCommand::DeletePersonalAccessToken(payload) => {
-                // TODO: Stub untill protocol gets adjusted.
-                let user_id = 0;
+impl StateHandler<UsersInner> for ChangePassword {
+    fn apply(&self, state: &mut UsersInner) {
+        let Some(user_id) = state.resolve_user_id(&self.user_id) else {
+            return;
+        };
 
-                if let Some(user_tokens) = 
self.personal_access_tokens.get_mut(&user_id) {
-                    let name_arc: Arc<str> = Arc::from(payload.name.as_str());
-                    user_tokens.remove(&name_arc);
-                }
-            }
+        if let Some(user) = state.items.get_mut(user_id) {
+            user.password_hash = Arc::from(self.new_password.as_str());
+        }
+    }
+}
+
+impl StateHandler<UsersInner> for UpdatePermissions {
+    fn apply(&self, state: &mut UsersInner) {
+        let Some(user_id) = state.resolve_user_id(&self.user_id) else {
+            return;
+        };
+
+        if let Some(user) = state.items.get_mut(user_id) {
+            user.permissions = self.permissions.as_ref().map(|p| 
Arc::new(p.clone()));
+        }
+    }
+}
+
+impl StateHandler<UsersInner> for CreatePersonalAccessToken {
+    fn apply(&self, state: &mut UsersInner) {
+        // TODO: Stub until protocol gets adjusted.
+        let user_id = 0;
+        let user_tokens = 
state.personal_access_tokens.entry(user_id).or_default();
+        let name_arc: Arc<str> = Arc::from(self.name.as_str());
+        if user_tokens.contains_key(&name_arc) {
+            return;
+        }
+
+        let expiry_at = 
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), self.expiry);
+        if let Some(expiry_at) = expiry_at
+            && expiry_at.as_micros() <= IggyTimestamp::now().as_micros()
+        {
+            return;
+        }
+
+        let (pat, _) = PersonalAccessToken::new(
+            user_id,
+            self.name.as_ref(),
+            IggyTimestamp::now(),
+            self.expiry,
+        );
+        user_tokens.insert(name_arc, pat);
+    }
+}
+
+impl StateHandler<UsersInner> for DeletePersonalAccessToken {
+    fn apply(&self, state: &mut UsersInner) {
+        // TODO: Stub until protocol gets adjusted.
+        let user_id = 0;
+
+        if let Some(user_tokens) = 
state.personal_access_tokens.get_mut(&user_id) {
+            let name_arc: Arc<str> = Arc::from(self.name.as_str());
+            user_tokens.remove(&name_arc);
         }
     }
 }


Reply via email to