This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit bedae0cca5b71e9fbbd59e639aec14d6821e0fe6
Author: numinex <[email protected]>
AuthorDate: Thu Jan 15 10:17:32 2026 +0100

    test
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 .../consumer_groups/create_consumer_group.rs       |   2 +-
 .../consumer_groups/delete_consumer_group.rs       |   2 +-
 .../src/commands/partitions/create_partitions.rs   |   2 +-
 .../src/commands/partitions/delete_partitions.rs   |   2 +-
 .../src/commands/segments/delete_segments.rs       |   2 +-
 core/common/src/commands/streams/create_stream.rs  |   2 +-
 core/common/src/commands/streams/delete_stream.rs  |   2 +-
 core/common/src/commands/streams/purge_stream.rs   |   2 +-
 core/common/src/commands/streams/update_stream.rs  |   2 +-
 core/common/src/commands/topics/create_topic.rs    |   2 +-
 core/common/src/commands/topics/delete_topic.rs    |   2 +-
 core/common/src/commands/topics/purge_topic.rs     |   2 +-
 core/common/src/commands/topics/update_topic.rs    |   2 +-
 core/common/src/commands/users/change_password.rs  |   2 +-
 core/common/src/commands/users/create_user.rs      |   2 +-
 core/common/src/commands/users/delete_user.rs      |   2 +-
 .../src/commands/users/update_permissions.rs       |   2 +-
 core/common/src/commands/users/update_user.rs      |   2 +-
 core/common/src/types/consensus/message.rs         |  16 +
 core/metadata/Cargo.toml                           |   1 +
 core/metadata/src/stm/consumer_group.rs            | 147 +------
 core/metadata/src/stm/mod.rs                       | 277 +++++++++----
 core/metadata/src/stm/mux.rs                       |  12 +-
 core/metadata/src/stm/stream.rs                    | 437 +++++----------------
 core/metadata/src/stm/user.rs                      | 279 ++-----------
 core/server/Cargo.toml                             |   2 +-
 28 files changed, 401 insertions(+), 808 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 88953e444..c7a1a3d7b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5702,6 +5702,7 @@ dependencies = [
  "consensus",
  "iggy_common",
  "journal",
+ "left-right",
  "message_bus",
  "slab",
  "tracing",
diff --git a/Cargo.toml b/Cargo.toml
index 3b68471ce..32dd683f3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -142,6 +142,7 @@ iggy_connector_sdk = { path = "core/connectors/sdk", 
version = "0.1.1-edge.1" }
 integration = { path = "core/integration" }
 keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
 lazy_static = "1.5.0"
+left-right = "0.11"
 log = "0.4.29"
 mimalloc = "0.1"
 mockall = "0.14.0"
diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs 
b/core/common/src/commands/consumer_groups/create_consumer_group.rs
index c5bd3e912..c03d39964 100644
--- a/core/common/src/commands/consumer_groups/create_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `name` - unique consumer group name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateConsumerGroup {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/consumer_groups/delete_consumer_group.rs 
b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
index 3eee5cc1c..859ded6f9 100644
--- a/core/common/src/commands/consumer_groups/delete_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `group_id` - unique consumer group ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteConsumerGroup {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/partitions/create_partitions.rs 
b/core/common/src/commands/partitions/create_partitions.rs
index 9aa862d5c..a01d3b26d 100644
--- a/core/common/src/commands/partitions/create_partitions.rs
+++ b/core/common/src/commands/partitions/create_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partitions_count` - number of partitions in the topic to create, max 
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreatePartitions {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/partitions/delete_partitions.rs 
b/core/common/src/commands/partitions/delete_partitions.rs
index 068f502ce..12be4448a 100644
--- a/core/common/src/commands/partitions/delete_partitions.rs
+++ b/core/common/src/commands/partitions/delete_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partitions_count` - number of partitions in the topic to delete, max 
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct DeletePartitions {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/segments/delete_segments.rs 
b/core/common/src/commands/segments/delete_segments.rs
index 221e5c667..841a84ac7 100644
--- a/core/common/src/commands/segments/delete_segments.rs
+++ b/core/common/src/commands/segments/delete_segments.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partition_id` - unique partition ID (numeric or name).
 /// - `segments_count` - number of segments in the partition to delete.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct DeleteSegments {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/create_stream.rs 
b/core/common/src/commands/streams/create_stream.rs
index 292cfcfc2..9ff2fec5a 100644
--- a/core/common/src/commands/streams/create_stream.rs
+++ b/core/common/src/commands/streams/create_stream.rs
@@ -29,7 +29,7 @@ use std::str::from_utf8;
 /// `CreateStream` command is used to create a new stream.
 /// It has additional payload:
 /// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateStream {
     /// Unique stream name (string), max length is 255 characters.
     pub name: String,
diff --git a/core/common/src/commands/streams/delete_stream.rs 
b/core/common/src/commands/streams/delete_stream.rs
index ac8aa57f9..f44110340 100644
--- a/core/common/src/commands/streams/delete_stream.rs
+++ b/core/common/src/commands/streams/delete_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `DeleteStream` command is used to delete an existing stream.
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/purge_stream.rs 
b/core/common/src/commands/streams/purge_stream.rs
index 805e7c944..036930ff8 100644
--- a/core/common/src/commands/streams/purge_stream.rs
+++ b/core/common/src/commands/streams/purge_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `PurgeStream` command is used to purge stream data (all the messages from 
its topics).
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct PurgeStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/update_stream.rs 
b/core/common/src/commands/streams/update_stream.rs
index ced247e71..ce3600f88 100644
--- a/core/common/src/commands/streams/update_stream.rs
+++ b/core/common/src/commands/streams/update_stream.rs
@@ -32,7 +32,7 @@ use std::str::from_utf8;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct UpdateStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/create_topic.rs 
b/core/common/src/commands/topics/create_topic.rs
index ba90084e7..48cd3d525 100644
--- a/core/common/src/commands/topics/create_topic.rs
+++ b/core/common/src/commands/topics/create_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
 ///   Can't be lower than segment size in the config.
 /// - `replication_factor` - replication factor for the topic.
 /// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/delete_topic.rs 
b/core/common/src/commands/topics/delete_topic.rs
index a41c0de02..45680311d 100644
--- a/core/common/src/commands/topics/delete_topic.rs
+++ b/core/common/src/commands/topics/delete_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/purge_topic.rs 
b/core/common/src/commands/topics/purge_topic.rs
index 061e374fa..9473b38c7 100644
--- a/core/common/src/commands/topics/purge_topic.rs
+++ b/core/common/src/commands/topics/purge_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct PurgeTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/update_topic.rs 
b/core/common/src/commands/topics/update_topic.rs
index 32a2ba7a5..83dadb69c 100644
--- a/core/common/src/commands/topics/update_topic.rs
+++ b/core/common/src/commands/topics/update_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
 ///   Can't be lower than segment size in the config.
 /// - `replication_factor` - replication factor for the topic.
 /// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct UpdateTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/change_password.rs 
b/core/common/src/commands/users/change_password.rs
index 27f1ea6b8..4a5663778 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
 /// - `user_id` - unique user ID (numeric or name).
 /// - `current_password` - current password, must be between 3 and 100 
characters long.
 /// - `new_password` - new password, must be between 3 and 100 characters long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct ChangePassword {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/create_user.rs 
b/core/common/src/commands/users/create_user.rs
index 03e0cb167..7d23c54ee 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
 /// - `password` - password of the user, must be between 3 and 100 characters 
long.
 /// - `status` - status of the user, can be either `active` or `inactive`.
 /// - `permissions` - optional permissions of the user. If not provided, user 
will have no permissions.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateUser {
     /// Unique name of the user, must be between 3 and 50 characters long.
     pub username: String,
diff --git a/core/common/src/commands/users/delete_user.rs 
b/core/common/src/commands/users/delete_user.rs
index c41a967ff..3cc5682a6 100644
--- a/core/common/src/commands/users/delete_user.rs
+++ b/core/common/src/commands/users/delete_user.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `DeleteUser` command is used to delete a user by unique ID.
 /// It has additional payload:
 /// - `user_id` - unique user ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteUser {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/update_permissions.rs 
b/core/common/src/commands/users/update_permissions.rs
index ab8e525f0..ff96927dd 100644
--- a/core/common/src/commands/users/update_permissions.rs
+++ b/core/common/src/commands/users/update_permissions.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `user_id` - unique user ID (numeric or name).
 /// - `permissions` - new permissions (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct UpdatePermissions {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/update_user.rs 
b/core/common/src/commands/users/update_user.rs
index 28e2504d2..3e2811915 100644
--- a/core/common/src/commands/users/update_user.rs
+++ b/core/common/src/commands/users/update_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
 /// - `user_id` - unique user ID (numeric or name).
 /// - `username` - new username (optional), if provided, must be between 3 and 
50 characters long.
 /// - `status` - new status (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct UpdateUser {
     #[serde(skip)]
     pub user_id: Identifier,
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 0e94820f8..ac32cc3c8 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -141,6 +141,22 @@ where
         }
     }
 
+    /// Get the message body as zero-copy `Bytes`.
+    ///
+    /// Returns an empty `Bytes` if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body_bytes(&self) -> Bytes {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            self.buffer.slice(header_size..total_size)
+        } else {
+            Bytes::new()
+        }
+    }
+
     /// Get the complete message as bytes (header + body).
     #[inline]
     #[allow(unused)]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index ee886e7c8..d7fcecfc6 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -33,6 +33,7 @@ bytes = { workspace = true }
 consensus = { path = "../consensus" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
+left-right = { workspace = true }
 message_bus = { path = "../message_bus" }
 slab = "0.4.11"
 tracing = { workspace = true }
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 944627ad7..7eb5d08cb 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,24 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::{ApplyState, StateCommand};
 use ahash::AHashMap;
-use bytes::Bytes;
-use iggy_common::create_consumer_group::CreateConsumerGroup;
-use iggy_common::delete_consumer_group::DeleteConsumerGroup;
-use iggy_common::{
-    BytesSerializable, IggyTimestamp,
-    header::{Operation, PrepareHeader},
-    message::Message,
-};
+use iggy_common::IggyTimestamp;
 use slab::Slab;
-use std::cell::RefCell;
 
 // ============================================================================
 // ConsumerGroupMember - Individual member of a consumer group
 // ============================================================================
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Default)]
 pub struct ConsumerGroupMember {
     pub id: u32,
     pub joined_at: IggyTimestamp,
@@ -48,7 +39,7 @@ impl ConsumerGroupMember {
 // ConsumerGroup - A group of consumers
 // ============================================================================
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Default)]
 pub struct ConsumerGroup {
     pub id: usize,
     pub stream_id: usize,
@@ -93,138 +84,12 @@ impl ConsumerGroup {
 
 #[derive(Debug, Clone, Default)]
 pub struct ConsumerGroups {
-    // Global index for all consumer groups across all streams/topics
-    index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id, 
topic_id, name) -> id
-    items: RefCell<Slab<ConsumerGroup>>,
+    pub index: AHashMap<(usize, usize, String), usize>,
+    pub items: Slab<ConsumerGroup>,
 }
 
 impl ConsumerGroups {
     pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
-        }
-    }
-
-    /// Insert a consumer group and return the assigned ID
-    pub fn insert(&self, group: ConsumerGroup) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let key = (group.stream_id, group.topic_id, group.name.clone());
-        let id = items.insert(group);
-        items[id].id = id;
-        index.insert(key, id);
-        id
-    }
-
-    /// Get consumer group by ID
-    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    /// Get consumer group by stream_id, topic_id, and name
-    pub fn get_by_location(
-        &self,
-        stream_id: usize,
-        topic_id: usize,
-        name: &str,
-    ) -> Option<ConsumerGroup> {
-        let index = self.index.borrow();
-        let key = (stream_id, topic_id, name.to_string());
-        if let Some(&id) = index.get(&key) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
-
-    /// Remove consumer group by ID
-    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let group = items.remove(id);
-        let key = (group.stream_id, group.topic_id, group.name.clone());
-        index.remove(&key);
-        Some(group)
-    }
-
-    /// Get all consumer groups for a specific topic
-    pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) -> 
Vec<ConsumerGroup> {
-        self.items
-            .borrow()
-            .iter()
-            .filter_map(|(_, g)| {
-                if g.stream_id == stream_id && g.topic_id == topic_id {
-                    Some(g.clone())
-                } else {
-                    None
-                }
-            })
-            .collect()
-    }
-
-    /// Get number of consumer groups
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    /// Check if empty
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    /// Get all consumer groups
-    pub fn values(&self) -> Vec<ConsumerGroup> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, g): (usize, &ConsumerGroup)| g.clone())
-            .collect()
-    }
-}
-
-#[derive(Debug)]
-pub enum ConsumerGroupsCommand {
-    Create(CreateConsumerGroup),
-    Delete(DeleteConsumerGroup),
-}
-
-impl StateCommand for ConsumerGroups {
-    type Command = ConsumerGroupsCommand;
-    type Input = Message<PrepareHeader>;
-
-    fn into_command(input: &Self::Input) -> Option<Self::Command> {
-        // TODO: rework this thing, so we don't copy the bytes on each request
-        let body = Bytes::copy_from_slice(input.body());
-        match input.header().operation {
-            Operation::CreateConsumerGroup => 
Some(ConsumerGroupsCommand::Create(
-                CreateConsumerGroup::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::DeleteConsumerGroup => 
Some(ConsumerGroupsCommand::Delete(
-                DeleteConsumerGroup::from_bytes(body.clone()).unwrap(),
-            )),
-            _ => None,
-        }
-    }
-}
-
-impl ApplyState for ConsumerGroups {
-    type Output = ();
-
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            ConsumerGroupsCommand::Create(payload) => {
-                todo!("Handle Create consumer group with {:?}", payload)
-            }
-            ConsumerGroupsCommand::Delete(payload) => {
-                todo!("Handle Delete consumer group with {:?}", payload)
-            }
-        }
+        Self::default()
     }
 }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e6c13684b..5baeeb421 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -20,51 +20,194 @@ pub mod mux;
 pub mod stream;
 pub mod user;
 
-/// Macro to generate a `{State}Command` enum and implement `StateCommand` 
trait.
-///
-/// # Arguments
-/// * `$state_type` - The type that implements `ApplyState` trait
-/// * `$command_enum` - The name of the command enum to generate (e.g., 
StreamsCommand)
-/// * `$operations` - Array of Operation enum variants (also used as payload 
type names)
-///
-/// # Example
-/// ```ignore
-/// define_state_command! {
-///     Streams,
-///     StreamsCommand,
-///     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
-/// }
-/// ```
+use std::cell::UnsafeCell;
+
+// ============================================================================
+// WriteCell - Interior mutability wrapper for WriteHandle
+// ============================================================================
+
+pub struct WriteCell<T: left_right::Absorb<O>, O> {
+    inner: UnsafeCell<left_right::WriteHandle<T, O>>,
+}
+
+impl<T: left_right::Absorb<O>, O> WriteCell<T, O> {
+    pub fn new(write: left_right::WriteHandle<T, O>) -> Self {
+        Self {
+            inner: UnsafeCell::new(write),
+        }
+    }
+
+    pub fn with<F, R>(&self, f: F) -> R
+    where
+        F: FnOnce(&mut left_right::WriteHandle<T, O>) -> R,
+    {
+        // SAFETY: WriteCell is !Sync (via UnsafeCell), so only one thread can 
access.
+        // The caller ensures exclusive access through the &self borrow.
+        unsafe { f(&mut *self.inner.get()) }
+    }
+}
+
+// ============================================================================
+// Traits - All use &self, implementations use interior mutability
+// ============================================================================
+
+/// Parses input into a command.
+pub trait Command {
+    type Cmd;
+    type Input;
+
+    fn into_command(input: &Self::Input) -> Option<Self::Cmd>;
+}
+
+/// Dispatches a command to mutate state.
+/// Takes `&mut self` - implementations use interior mutability.
+pub trait Dispatch {
+    type Cmd;
+    type Output;
+
+    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output;
+}
+
+/// Applies a command through a state wrapper.
+/// Takes `&self` - implementations use interior mutability.
+pub trait ApplyState {
+    type Cmd;
+    type Output;
+    type Inner: Command<Cmd = Self::Cmd>;
+
+    fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output>;
+}
+
+/// Public interface for state machines.
+/// Takes `&self` - implementations use interior mutability.
+pub trait State {
+    type Output;
+    type Input;
+
+    fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
+}
+
+impl<T> State for T
+where
+    T: ApplyState,
+{
+    type Output = T::Output;
+    type Input = <T::Inner as Command>::Input;
+
+    fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+        let cmd = T::Inner::into_command(input)?;
+        self.do_apply(cmd)
+    }
+}
+
+pub(crate) trait StateMachine {
+    type Input;
+    type Output;
+    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
+}
+
 #[macro_export]
-macro_rules! define_state_command {
+macro_rules! define_state {
     (
-        $state_type:ty,
-        $command_enum:ident,
+        $state:ident,
+        $inner:ident {
+            $($field_name:ident : $field_type:ty),* $(,)?
+        },
+        $command:ident,
         [$($operation:ident),* $(,)?]
     ) => {
+        #[derive(Debug, Clone, Default)]
+        pub struct $inner {
+            $(
+                pub $field_name: $field_type,
+            )*
+        }
+
         #[derive(Debug)]
-        pub enum $command_enum {
+        pub enum $command {
             $(
                 $operation($operation),
             )*
         }
 
-        impl $crate::stm::StateCommand for $state_type {
-            type Command = $command_enum;
+        impl Clone for $command
+        where
+            $($operation: Clone,)*
+        {
+            fn clone(&self) -> Self {
+                match self {
+                    $(
+                        $command::$operation(payload) => 
$command::$operation(payload.clone()),
+                    )*
+                }
+            }
+        }
+
+        /// State wrapper with interior mutability for write access.
+        pub struct $state {
+            write: Option<$crate::stm::WriteCell<$inner, $command>>,
+            read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>,
+        }
+
+        impl $state {
+            /// Get a clone of the read handle.
+            pub fn read_handle(&self) -> 
::std::sync::Arc<::left_right::ReadHandle<$inner>> {
+                self.read.clone()
+            }
+
+            /// Get read access to the inner state.
+            pub fn read(&self) -> Option<::left_right::ReadGuard<'_, $inner>> {
+                self.read.enter()
+            }
+
+            /// Check if this instance has write capability.
+            pub fn has_write(&self) -> bool {
+                self.write.is_some()
+            }
+        }
+
+        impl From<$inner> for $state {
+            fn from(inner: $inner) -> Self {
+                let (write, read) = ::left_right::new_from_empty(inner);
+                let write = Some($crate::stm::WriteCell::new(write));
+                let read = ::std::sync::Arc::new(read);
+                Self { write, read }
+            }
+        }
+
+        impl From<::std::sync::Arc<::left_right::ReadHandle<$inner>>> for 
$state {
+            /// Create a read-only instance from a read handle.
+            fn from(read: ::std::sync::Arc<::left_right::ReadHandle<$inner>>) 
-> Self {
+                Self { write: None, read }
+            }
+        }
+
+        impl ::std::fmt::Debug for $state {
+            fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> 
::std::fmt::Result {
+                match self.read.enter() {
+                    Some(guard) => f.debug_struct(stringify!($state))
+                        .field("has_write", &self.write.is_some())
+                        .field("inner", &*guard)
+                        .finish(),
+                    None => 
f.debug_struct(stringify!($state)).finish_non_exhaustive(),
+                }
+            }
+        }
+
+        impl $crate::stm::Command for $inner {
+            type Cmd = $command;
             type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
 
-            fn into_command(input: &Self::Input) -> Option<Self::Command> {
+            fn into_command(input: &Self::Input) -> Option<Self::Cmd> {
                 use ::iggy_common::BytesSerializable;
-                use ::bytes::Bytes;
                 use ::iggy_common::header::Operation;
 
-                // TODO: rework this thing, so we don't copy the bytes on each 
request
-                let body = Bytes::copy_from_slice(input.body());
+                let body = input.body_bytes();
                 match input.header().operation {
                     $(
                         Operation::$operation => {
-                            Some($command_enum::$operation(
-                                $operation::from_bytes(body.clone()).unwrap()
+                            Some($command::$operation(
+                                $operation::from_bytes(body).unwrap()
                             ))
                         },
                     )*
@@ -73,56 +216,44 @@ macro_rules! define_state_command {
             }
         }
 
-        // Compile-time check that the type implements ApplyState
-        const _: () = {
-            const fn assert_impl_apply_state<T: $crate::stm::ApplyState>() {}
-            assert_impl_apply_state::<$state_type>();
-        };
-    };
-}
-
-// This is public interface to state, therefore it will be imported from 
different crate, for now during development I am leaving it there.
-pub trait State
-where
-    Self: Sized,
-{
-    type Output;
-    type Input;
-
-    // Apply the state machine logic and return an optional output.
-    // The output is optional, as we model the `StateMachine`, as an variadic 
list,
-    // where not all state machines will produce an output for every input 
event.
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
-}
-
-// TODO: This interface should be private to the stm module.
-pub trait StateMachine {
-    type Input;
-    type Output;
-    fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
-}
+        impl ::left_right::Absorb<$command> for $inner
+        where
+            $inner: $crate::stm::Dispatch<Cmd = $command>,
+        {
+            fn absorb_first(&mut self, cmd: &mut $command, _other: &Self) {
+                use $crate::stm::Dispatch;
+                self.dispatch(cmd);
+            }
 
-pub trait StateCommand {
-    type Command;
-    type Input;
+            fn absorb_second(&mut self, cmd: $command, _other: &Self) {
+                use $crate::stm::Dispatch;
+                self.dispatch(&cmd);
+            }
 
-    fn into_command(input: &Self::Input) -> Option<Self::Command>;
-}
+            fn sync_with(&mut self, first: &Self) {
+                *self = first.clone();
+            }
 
-pub trait ApplyState: StateCommand {
-    type Output;
+            fn drop_first(self: Box<Self>) {}
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output;
-}
+            fn drop_second(self: Box<Self>) {}
+        }
 
-impl<T> State for T
-where
-    T: ApplyState,
-{
-    type Output = T::Output;
-    type Input = T::Input;
+        impl $crate::stm::ApplyState for $state
+        where
+            $inner: $crate::stm::Dispatch<Cmd = $command>,
+        {
+            type Cmd = $command;
+            type Output = ();
+            type Inner = $inner;
 
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-        T::into_command(input).map(|cmd| self.do_apply(cmd))
-    }
+            fn do_apply(&self, cmd: Self::Cmd) -> Option<Self::Output> {
+                let write_cell = self.write.as_ref()?;
+                write_cell.with(|write| {
+                    write.append(cmd).publish();
+                });
+                Some(())
+            }
+        }
+    };
 }
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 3fc80d88b..979ff3ccf 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -92,14 +92,16 @@ mod tests {
 
     #[test]
     fn construct_mux_state_machine_from_states_with_same_output() {
-        use crate::stm::*;
+        use crate::stm::StateMachine;
+        use crate::stm::mux::MuxStateMachine;
+        use crate::stm::stream::{Streams, StreamsInner};
+        use crate::stm::user::{Users, UsersInner};
         use iggy_common::header::PrepareHeader;
         use iggy_common::message::Message;
 
-        let users = user::Users::new();
-        let streams = stream::Streams::new();
-        let cgs = consumer_group::ConsumerGroups::new();
-        let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let mux = MuxStateMachine::new(variadic!(users, streams));
 
         let input = Message::new(std::mem::size_of::<PrepareHeader>());
         let mut output = Vec::new();
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 6f05e5aa4..844a30565 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,29 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::define_state_command;
+use crate::define_state;
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::ApplyState;
+use crate::stm::Dispatch;
 use ahash::AHashMap;
-use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::create_stream::CreateStream;
-use iggy_common::create_topic::CreateTopic;
-use iggy_common::delete_partitions::DeletePartitions;
-use iggy_common::delete_segments::DeleteSegments;
 use iggy_common::delete_stream::DeleteStream;
-use iggy_common::delete_topic::DeleteTopic;
 use iggy_common::purge_stream::PurgeStream;
-use iggy_common::purge_topic::PurgeTopic;
 use iggy_common::update_stream::UpdateStream;
-use iggy_common::update_topic::UpdateTopic;
-use iggy_common::{
-    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
-};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use slab::Slab;
-use std::cell::RefCell;
 use std::sync::Arc;
 
-#[derive(Debug, Clone)]
+// ============================================================================
+// Partition Entity
+// ============================================================================
+
+#[derive(Debug, Clone, Default)]
 pub struct Partition {
     pub id: usize,
 }
@@ -48,56 +42,26 @@ impl Partition {
     }
 }
 
+// ============================================================================
+// Partitions Collection
+// ============================================================================
+
 #[derive(Debug, Clone, Default)]
 pub struct Partitions {
-    items: RefCell<Slab<Partition>>,
+    pub items: Slab<Partition>,
 }
 
 impl Partitions {
     pub fn new() -> Self {
-        Self {
-            items: RefCell::new(Slab::with_capacity(1024)),
-        }
-    }
-
-    pub fn insert(&self, partition: Partition) -> usize {
-        let mut items = self.items.borrow_mut();
-        let id = items.insert(partition);
-        items[id].id = id;
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Partition> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn remove(&self, id: usize) -> Option<Partition> {
-        let mut items = self.items.borrow_mut();
-        if items.contains(id) {
-            Some(items.remove(id))
-        } else {
-            None
-        }
-    }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    pub fn iter(&self) -> Vec<Partition> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, p): (usize, &Partition)| p.clone())
-            .collect()
+        Self::default()
     }
 }
 
-#[derive(Debug, Clone)]
+// ============================================================================
+// ConsumerGroup (local to Topic, not a state machine)
+// ============================================================================
+
+#[derive(Debug, Clone, Default)]
 pub struct ConsumerGroup {
     pub id: usize,
     pub name: String,
@@ -114,66 +78,50 @@ impl ConsumerGroup {
     }
 }
 
+// ============================================================================
+// ConsumerGroups (local to Topic, simple collection - no left_right)
+// ============================================================================
+
 #[derive(Debug, Clone, Default)]
 pub struct ConsumerGroups {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<ConsumerGroup>>,
+    index: AHashMap<String, usize>,
+    items: Slab<ConsumerGroup>,
 }
 
 impl ConsumerGroups {
     pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
-        }
+        Self::default()
     }
 
-    pub fn insert(&self, group: ConsumerGroup) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = group.name.clone();
-        let id = items.insert(group);
-        items[id].id = id;
-        index.insert(name, id);
-        id
+    pub fn insert(&self, _group: ConsumerGroup) -> usize {
+        0
     }
 
-    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
-        self.items.borrow().get(id).cloned()
+    pub fn get(&self, _id: usize) -> Option<ConsumerGroup> {
+        None
     }
 
-    pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
+    pub fn get_by_name(&self, _name: &str) -> Option<ConsumerGroup> {
+        None
     }
 
-    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let group = items.remove(id);
-        index.remove(&group.name);
-        Some(group)
+    pub fn remove(&self, _id: usize) -> Option<ConsumerGroup> {
+        None
     }
 
     pub fn len(&self) -> usize {
-        self.items.borrow().len()
+        0
     }
 
     pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
+        true
     }
 }
 
+// ============================================================================
+// Topic Entity
+// ============================================================================
+
 #[derive(Debug, Clone)]
 pub struct Topic {
     pub id: usize,
@@ -189,6 +137,23 @@ pub struct Topic {
     pub consumer_groups: ConsumerGroups,
 }
 
+impl Default for Topic {
+    fn default() -> Self {
+        Self {
+            id: 0,
+            name: String::new(),
+            created_at: IggyTimestamp::default(),
+            replication_factor: 1,
+            message_expiry: IggyExpiry::default(),
+            compression_algorithm: CompressionAlgorithm::default(),
+            max_topic_size: MaxTopicSize::default(),
+            stats: Arc::new(TopicStats::default()),
+            partitions: Partitions::new(),
+            consumer_groups: ConsumerGroups::new(),
+        }
+    }
+}
+
 impl Topic {
     pub fn new(
         name: String,
@@ -214,86 +179,27 @@ impl Topic {
     }
 }
 
+// ============================================================================
+// Topics Collection
+// ============================================================================
+
 #[derive(Debug, Clone, Default)]
 pub struct Topics {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<Topic>>,
+    pub index: AHashMap<String, usize>,
+    pub items: Slab<Topic>,
 }
 
 impl Topics {
     pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(1024)),
-            items: RefCell::new(Slab::with_capacity(1024)),
-        }
-    }
-
-    pub fn insert(&self, topic: Topic) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = topic.name.clone();
-        let id = items.insert(topic);
-        items[id].id = id;
-        index.insert(name, id);
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Topic> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn get_by_name(&self, name: &str) -> Option<Topic> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
-
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> {
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.get(id as usize)
-                } else {
-                    None
-                }
-            }
-            iggy_common::IdKind::String => {
-                if let Ok(name) = identifier.get_string_value() {
-                    self.get_by_name(&name)
-                } else {
-                    None
-                }
-            }
-        }
-    }
-
-    pub fn remove(&self, id: usize) -> Option<Topic> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let topic = items.remove(id);
-        index.remove(&topic.name);
-        Some(topic)
-    }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
+        Self::default()
     }
 }
 
-#[derive(Debug, Clone)]
+// ============================================================================
+// Stream Entity
+// ============================================================================
+
+#[derive(Debug)]
 pub struct Stream {
     pub id: usize,
     pub name: String,
@@ -303,198 +209,73 @@ pub struct Stream {
     pub topics: Topics,
 }
 
-impl Stream {
-    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+impl Default for Stream {
+    fn default() -> Self {
         Self {
             id: 0,
-            name,
-            created_at,
+            name: String::new(),
+            created_at: IggyTimestamp::default(),
             stats: Arc::new(StreamStats::default()),
             topics: Topics::new(),
         }
     }
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Streams {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<Stream>>,
-}
-
-impl Streams {
-    pub fn new() -> Self {
+impl Clone for Stream {
+    fn clone(&self) -> Self {
         Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
-        }
-    }
-
-    pub fn insert(&self, stream: Stream) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = stream.name.clone();
-        let id = items.insert(stream);
-        items[id].id = id;
-        index.insert(name, id);
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Stream> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn get_by_name(&self, name: &str) -> Option<Stream> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
-
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream> 
{
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.get(id as usize)
-                } else {
-                    None
-                }
-            }
-            iggy_common::IdKind::String => {
-                if let Ok(name) = identifier.get_string_value() {
-                    self.get_by_name(&name)
-                } else {
-                    None
-                }
-            }
-        }
-    }
-
-    pub fn remove(&self, id: usize) -> Option<Stream> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let stream = items.remove(id);
-        index.remove(&stream.name);
-        Some(stream)
-    }
-
-    pub fn update_name(&self, identifier: &Identifier, new_name: String) -> 
Result<(), IggyError> {
-        let stream = self.get_by_identifier(identifier);
-        if let Some(stream) = stream {
-            let mut items = self.items.borrow_mut();
-            let mut index = self.index.borrow_mut();
-
-            index.remove(&stream.name);
-            if let Some(s) = items.get_mut(stream.id) {
-                s.name = new_name.clone();
-            }
-            index.insert(new_name, stream.id);
-            Ok(())
-        } else {
-            Err(IggyError::ResourceNotFound("Stream".to_string()))
+            id: self.id,
+            name: self.name.clone(),
+            created_at: self.created_at,
+            stats: self.stats.clone(),
+            topics: self.topics.clone(),
         }
     }
+}
 
-    pub fn purge(&self, id: usize) -> Result<(), IggyError> {
-        let items = self.items.borrow();
-        if let Some(_stream) = items.get(id) {
-            // TODO: Purge all topics in the stream
-            Ok(())
-        } else {
-            Err(IggyError::ResourceNotFound("Stream".to_string()))
+impl Stream {
+    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+        Self {
+            id: 0,
+            name,
+            created_at,
+            stats: Arc::new(StreamStats::default()),
+            topics: Topics::new(),
         }
     }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    pub fn iter(&self) -> Vec<Stream> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, s): (usize, &Stream)| s.clone())
-            .collect()
-    }
 }
 
-// Define StreamsCommand enum and StateCommand implementation using the macro
-define_state_command! {
+// ============================================================================
+// Streams State Machine
+// ============================================================================
+
+define_state! {
     Streams,
+    StreamsInner {
+        index: AHashMap<String, usize>,
+        items: Slab<Stream>,
+    },
     StreamsCommand,
     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
 }
 
-impl ApplyState for Streams {
+impl Dispatch for StreamsInner {
+    type Cmd = StreamsCommand;
     type Output = ();
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output {
         match cmd {
-            StreamsCommand::CreateStream(payload) => {
-                todo!("Handle Create stream with {:?}", payload)
-            }
-            StreamsCommand::UpdateStream(payload) => {
-                todo!("Handle Update stream with {:?}", payload)
+            StreamsCommand::CreateStream(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-            StreamsCommand::DeleteStream(payload) => {
-                todo!("Handle Delete stream with {:?}", payload)
-            }
-            StreamsCommand::PurgeStream(payload) => todo!("Handle Purge stream 
with {:?}", payload),
-        }
-    }
-}
-
-// Define TopicsCommand enum and StateCommand implementation using the macro
-define_state_command! {
-    Topics,
-    TopicsCommand,
-    [CreateTopic, UpdateTopic, DeleteTopic, PurgeTopic]
-}
-
-impl ApplyState for Topics {
-    type Output = ();
-
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            TopicsCommand::CreateTopic(payload) => todo!("Handle Create topic 
with {:?}", payload),
-            TopicsCommand::UpdateTopic(payload) => todo!("Handle Update topic 
with {:?}", payload),
-            TopicsCommand::DeleteTopic(payload) => todo!("Handle Delete topic 
with {:?}", payload),
-            TopicsCommand::PurgeTopic(payload) => todo!("Handle Purge topic 
with {:?}", payload),
-        }
-    }
-}
-
-// Define PartitionsCommand enum and StateCommand implementation using the 
macro
-define_state_command! {
-    Partitions,
-    PartitionsCommand,
-    [CreatePartitions, DeletePartitions, DeleteSegments]
-}
-
-impl ApplyState for Partitions {
-    type Output = ();
-
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            PartitionsCommand::CreatePartitions(payload) => {
-                todo!("Handle Create partitions with {:?}", payload)
+            StreamsCommand::UpdateStream(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-            PartitionsCommand::DeletePartitions(payload) => {
-                todo!("Handle Delete partitions with {:?}", payload)
+            StreamsCommand::DeleteStream(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-            PartitionsCommand::DeleteSegments(payload) => {
-                todo!("Handle Delete segments with {:?}", payload)
+            StreamsCommand::PurgeStream(_payload) => {
+                // Actual mutation logic will be implemented later
             }
         }
     }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index b31bdc764..043ec0bd2 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,27 +15,23 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{
-    permissioner::Permissioner,
-    stm::{ApplyState, StateCommand},
-};
+use crate::define_state;
+use crate::permissioner::Permissioner;
+use crate::stm::Dispatch;
 use ahash::AHashMap;
-use bytes::Bytes;
 use iggy_common::change_password::ChangePassword;
 use iggy_common::create_user::CreateUser;
 use iggy_common::delete_user::DeleteUser;
 use iggy_common::update_permissions::UpdatePermissions;
 use iggy_common::update_user::UpdateUser;
-use iggy_common::{
-    BytesSerializable, Identifier, IggyError, IggyTimestamp, Permissions, 
PersonalAccessToken,
-    UserId, UserStatus,
-    header::{Operation, PrepareHeader},
-    message::Message,
-};
+use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, 
UserStatus};
 use slab::Slab;
-use std::cell::RefCell;
 
-#[derive(Debug, Clone)]
+// ============================================================================
+// User Entity
+// ============================================================================
+
+#[derive(Debug, Clone, Default)]
 pub struct User {
     pub id: UserId,
     pub username: String,
@@ -66,242 +62,41 @@ impl User {
     }
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Users {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<User>>,
-    permissioner: RefCell<Permissioner>,
+// ============================================================================
+// Users State Machine
+// ============================================================================
+
+define_state! {
+    Users,
+    UsersInner {
+        index: AHashMap<String, usize>,
+        items: Slab<User>,
+        permissioner: Permissioner,
+    },
+    UsersCommand,
+    [CreateUser, UpdateUser, DeleteUser, ChangePassword, UpdatePermissions]
 }
 
-impl Users {
-    pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(1024)),
-            items: RefCell::new(Slab::with_capacity(1024)),
-            permissioner: RefCell::new(Permissioner::new()),
-        }
-    }
-
-    /// Insert a user and return the assigned ID
-    pub fn insert(&self, user: User) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let username = user.username.clone();
-        let id = items.insert(user);
-        items[id].id = id as u32;
-        index.insert(username, id);
-        id
-    }
-
-    /// Get user by ID
-    pub fn get(&self, id: usize) -> Option<User> {
-        self.items.borrow().get(id).cloned()
-    }
+impl Dispatch for UsersInner {
+    type Cmd = UsersCommand;
+    type Output = ();
 
-    /// Get user by username or ID (via Identifier enum)
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> 
Result<Option<User>, IggyError> {
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                let id = identifier.get_u32_value()? as usize;
-                Ok(self.items.borrow().get(id).cloned())
-            }
-            iggy_common::IdKind::String => {
-                let username = identifier.get_string_value()?;
-                let index = self.index.borrow();
-                if let Some(&id) = index.get(&username) {
-                    Ok(self.items.borrow().get(id).cloned())
-                } else {
-                    Ok(None)
-                }
+    fn dispatch(&self, cmd: &Self::Cmd) -> Self::Output {
+        match cmd {
+            UsersCommand::CreateUser(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-        }
-    }
-
-    /// Remove user by ID
-    pub fn remove(&self, id: usize) -> Option<User> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let user = items.remove(id);
-        index.remove(&user.username);
-        Some(user)
-    }
-
-    /// Check if user exists
-    pub fn contains(&self, identifier: &Identifier) -> bool {
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.items.borrow().contains(id as usize)
-                } else {
-                    false
-                }
+            UsersCommand::UpdateUser(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-            iggy_common::IdKind::String => {
-                if let Ok(username) = identifier.get_string_value() {
-                    self.index.borrow().contains_key(&username)
-                } else {
-                    false
-                }
+            UsersCommand::DeleteUser(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-        }
-    }
-
-    /// Get all users as a Vec
-    pub fn values(&self) -> Vec<User> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, u): (usize, &User)| u.clone())
-            .collect()
-    }
-
-    /// Get number of users
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    /// Check if empty
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    /// Check if username already exists
-    pub fn username_exists(&self, username: &str) -> bool {
-        self.index.borrow().contains_key(username)
-    }
-
-    /// Get ID by username
-    pub fn get_id_by_username(&self, username: &str) -> Option<usize> {
-        self.index.borrow().get(username).copied()
-    }
-
-    /// Initialize permissions for a user
-    pub fn init_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
-        self.permissioner
-            .borrow_mut()
-            .init_permissions(user_id, permissions);
-    }
-
-    /// Update permissions for a user
-    pub fn update_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
-        self.permissioner
-            .borrow_mut()
-            .update_permissions_for_user(user_id, permissions);
-    }
-
-    /// Delete permissions for a user
-    pub fn delete_permissions(&self, user_id: UserId) {
-        self.permissioner.borrow_mut().delete_permissions(user_id);
-    }
-
-    /// Update username
-    pub fn update_username(
-        &self,
-        identifier: &Identifier,
-        new_username: String,
-    ) -> Result<(), IggyError> {
-        let id = match identifier.kind {
-            iggy_common::IdKind::Numeric => identifier.get_u32_value()? as 
usize,
-            iggy_common::IdKind::String => {
-                let username = identifier.get_string_value()?;
-                let index = self.index.borrow();
-                *index
-                    .get(&username)
-                    .ok_or_else(|| 
IggyError::ResourceNotFound(username.to_string()))?
-            }
-        };
-
-        let old_username = {
-            let items = self.items.borrow();
-            let user = items
-                .get(id)
-                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
-            user.username.clone()
-        };
-
-        if old_username == new_username {
-            return Ok(());
-        }
-
-        tracing::trace!(
-            "Updating username: '{}' → '{}' for user ID: {}",
-            old_username,
-            new_username,
-            id
-        );
-
-        {
-            let mut items = self.items.borrow_mut();
-            let user = items
-                .get_mut(id)
-                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
-            user.username = new_username.clone();
-        }
-
-        let mut index = self.index.borrow_mut();
-        index.remove(&old_username);
-        index.insert(new_username, id);
-
-        Ok(())
-    }
-}
-
-#[derive(Debug)]
-pub enum UsersCommand {
-    Create(CreateUser),
-    Update(UpdateUser),
-    Delete(DeleteUser),
-    ChangePassword(ChangePassword),
-    UpdatePermissions(UpdatePermissions),
-}
-
-impl StateCommand for Users {
-    type Command = UsersCommand;
-    type Input = Message<PrepareHeader>;
-
-    fn into_command(input: &Self::Input) -> Option<Self::Command> {
-        // TODO: rework this thing, so we don't copy the bytes on each request
-        let body = Bytes::copy_from_slice(input.body());
-        match input.header().operation {
-            Operation::CreateUser => Some(UsersCommand::Create(
-                CreateUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::UpdateUser => Some(UsersCommand::Update(
-                UpdateUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::DeleteUser => Some(UsersCommand::Delete(
-                DeleteUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::ChangePassword => Some(UsersCommand::ChangePassword(
-                ChangePassword::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::UpdatePermissions => 
Some(UsersCommand::UpdatePermissions(
-                UpdatePermissions::from_bytes(body.clone()).unwrap(),
-            )),
-            _ => None,
-        }
-    }
-}
-
-impl ApplyState for Users {
-    type Output = ();
-
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            UsersCommand::Create(payload) => todo!("Handle Create user with 
{:?}", payload),
-            UsersCommand::Update(payload) => todo!("Handle Update user with 
{:?}", payload),
-            UsersCommand::Delete(payload) => todo!("Handle Delete user with 
{:?}", payload),
-            UsersCommand::ChangePassword(payload) => {
-                todo!("Handle Change password with {:?}", payload)
+            UsersCommand::ChangePassword(_payload) => {
+                // Actual mutation logic will be implemented later
             }
-            UsersCommand::UpdatePermissions(payload) => {
-                todo!("Handle Update permissions with {:?}", payload)
+            UsersCommand::UpdatePermissions(_payload) => {
+                // Actual mutation logic will be implemented later
             }
         }
     }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 3e3bb5dc4..c63e801a2 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -71,7 +71,7 @@ hash32 = "1.0.0"
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
 jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
-left-right = "0.11"
+left-right = { workspace = true }
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
 mime_guess = { version = "2.0", optional = true }

Reply via email to