This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 871b18397 feat(metadata): impl states for streams,users, 
consumer_groups in metadata module (#2582)
871b18397 is described below

commit 871b1839723e028b163556847d33eb6531b45ef1
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Fri Jan 23 13:04:54 2026 +0100

    feat(metadata): impl states for streams,users, consumer_groups in metadata 
module (#2582)
---
 Cargo.lock                                         |   3 +-
 Cargo.toml                                         |   1 +
 .../consumer_groups/create_consumer_group.rs       |   2 +-
 .../consumer_groups/delete_consumer_group.rs       |   2 +-
 .../src/commands/partitions/create_partitions.rs   |   2 +-
 .../src/commands/partitions/delete_partitions.rs   |   2 +-
 .../create_personal_access_token.rs                |   2 +-
 .../delete_personal_access_token.rs                |   2 +-
 .../src/commands/segments/delete_segments.rs       |   2 +-
 core/common/src/commands/streams/create_stream.rs  |   2 +-
 core/common/src/commands/streams/delete_stream.rs  |   2 +-
 core/common/src/commands/streams/purge_stream.rs   |   2 +-
 core/common/src/commands/streams/update_stream.rs  |   2 +-
 core/common/src/commands/topics/create_topic.rs    |   2 +-
 core/common/src/commands/topics/delete_topic.rs    |   2 +-
 core/common/src/commands/topics/purge_topic.rs     |   2 +-
 core/common/src/commands/topics/update_topic.rs    |   2 +-
 core/common/src/commands/users/change_password.rs  |   2 +-
 core/common/src/commands/users/create_user.rs      |   2 +-
 core/common/src/commands/users/delete_user.rs      |   2 +-
 .../src/commands/users/update_permissions.rs       |   2 +-
 core/common/src/commands/users/update_user.rs      |   2 +-
 core/common/src/types/consensus/message.rs         |  16 +
 core/metadata/Cargo.toml                           |   3 +-
 core/metadata/src/impls/metadata.rs                | 220 ++++---
 core/metadata/src/stm/consumer_group.rs            | 321 +++++-----
 core/metadata/src/stm/mod.rs                       | 304 +++++++---
 core/metadata/src/stm/mux.rs                       |  12 +-
 core/metadata/src/stm/stream.rs                    | 672 +++++++++------------
 core/metadata/src/stm/user.rs                      | 401 +++++-------
 core/server/Cargo.toml                             |   2 +-
 31 files changed, 1020 insertions(+), 975 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index f88aaf4a6..5f7fceb1a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5676,11 +5676,12 @@ name = "metadata"
 version = "0.1.0"
 dependencies = [
  "ahash 0.8.12",
- "bytes",
  "consensus",
  "iggy_common",
  "journal",
+ "left-right",
  "message_bus",
+ "paste",
  "slab",
  "tracing",
 ]
diff --git a/Cargo.toml b/Cargo.toml
index dbe9f6a98..802dde2d8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -142,6 +142,7 @@ iggy_connector_sdk = { path = "core/connectors/sdk", 
version = "0.1.1-edge.1" }
 integration = { path = "core/integration" }
 keyring = { version = "3.6.3", features = ["sync-secret-service", "vendored"] }
 lazy_static = "1.5.0"
+left-right = "0.11"
 log = "0.4.29"
 metadata = { path = "core/metadata" }
 mimalloc = "0.1"
diff --git a/core/common/src/commands/consumer_groups/create_consumer_group.rs 
b/core/common/src/commands/consumer_groups/create_consumer_group.rs
index c5bd3e912..c03d39964 100644
--- a/core/common/src/commands/consumer_groups/create_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/create_consumer_group.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `name` - unique consumer group name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateConsumerGroup {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/consumer_groups/delete_consumer_group.rs 
b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
index 3eee5cc1c..859ded6f9 100644
--- a/core/common/src/commands/consumer_groups/delete_consumer_group.rs
+++ b/core/common/src/commands/consumer_groups/delete_consumer_group.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `group_id` - unique consumer group ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteConsumerGroup {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/partitions/create_partitions.rs 
b/core/common/src/commands/partitions/create_partitions.rs
index 9aa862d5c..a01d3b26d 100644
--- a/core/common/src/commands/partitions/create_partitions.rs
+++ b/core/common/src/commands/partitions/create_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partitions_count` - number of partitions in the topic to create, max 
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreatePartitions {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/partitions/delete_partitions.rs 
b/core/common/src/commands/partitions/delete_partitions.rs
index 068f502ce..12be4448a 100644
--- a/core/common/src/commands/partitions/delete_partitions.rs
+++ b/core/common/src/commands/partitions/delete_partitions.rs
@@ -32,7 +32,7 @@ use std::fmt::Display;
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partitions_count` - number of partitions in the topic to delete, max 
value is 1000.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct DeletePartitions {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git 
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
 
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
index 6c0fbc54d..fa457a33e 100644
--- 
a/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
+++ 
b/core/common/src/commands/personal_access_tokens/create_personal_access_token.rs
@@ -31,7 +31,7 @@ use std::str::from_utf8;
 /// It has additional payload:
 /// - `name` - unique name of the token, must be between 3 and 30 characters 
long.
 /// - `expiry` - expiry of the token.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct CreatePersonalAccessToken {
     /// Unique name of the token, must be between 3 and 30 characters long.
     pub name: String,
diff --git 
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
 
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
index f54803fa9..064f78c48 100644
--- 
a/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
+++ 
b/core/common/src/commands/personal_access_tokens/delete_personal_access_token.rs
@@ -29,7 +29,7 @@ use std::str::from_utf8;
 /// `DeletePersonalAccessToken` command is used to delete a personal access 
token for the authenticated user.
 /// It has additional payload:
 /// - `name` - unique name of the token, must be between 3 and 30 characters 
long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct DeletePersonalAccessToken {
     /// Unique name of the token, must be between 3 and 30 characters long.
     pub name: String,
diff --git a/core/common/src/commands/segments/delete_segments.rs 
b/core/common/src/commands/segments/delete_segments.rs
index 221e5c667..841a84ac7 100644
--- a/core/common/src/commands/segments/delete_segments.rs
+++ b/core/common/src/commands/segments/delete_segments.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// - `topic_id` - unique topic ID (numeric or name).
 /// - `partition_id` - unique partition ID (numeric or name).
 /// - `segments_count` - number of segments in the partition to delete.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct DeleteSegments {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/create_stream.rs 
b/core/common/src/commands/streams/create_stream.rs
index 292cfcfc2..9ff2fec5a 100644
--- a/core/common/src/commands/streams/create_stream.rs
+++ b/core/common/src/commands/streams/create_stream.rs
@@ -29,7 +29,7 @@ use std::str::from_utf8;
 /// `CreateStream` command is used to create a new stream.
 /// It has additional payload:
 /// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateStream {
     /// Unique stream name (string), max length is 255 characters.
     pub name: String,
diff --git a/core/common/src/commands/streams/delete_stream.rs 
b/core/common/src/commands/streams/delete_stream.rs
index ac8aa57f9..f44110340 100644
--- a/core/common/src/commands/streams/delete_stream.rs
+++ b/core/common/src/commands/streams/delete_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `DeleteStream` command is used to delete an existing stream.
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/purge_stream.rs 
b/core/common/src/commands/streams/purge_stream.rs
index 805e7c944..036930ff8 100644
--- a/core/common/src/commands/streams/purge_stream.rs
+++ b/core/common/src/commands/streams/purge_stream.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `PurgeStream` command is used to purge stream data (all the messages from 
its topics).
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct PurgeStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/streams/update_stream.rs 
b/core/common/src/commands/streams/update_stream.rs
index ced247e71..ce3600f88 100644
--- a/core/common/src/commands/streams/update_stream.rs
+++ b/core/common/src/commands/streams/update_stream.rs
@@ -32,7 +32,7 @@ use std::str::from_utf8;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `name` - unique stream name (string), max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct UpdateStream {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/create_topic.rs 
b/core/common/src/commands/topics/create_topic.rs
index ba90084e7..48cd3d525 100644
--- a/core/common/src/commands/topics/create_topic.rs
+++ b/core/common/src/commands/topics/create_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
 ///   Can't be lower than segment size in the config.
 /// - `replication_factor` - replication factor for the topic.
 /// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/delete_topic.rs 
b/core/common/src/commands/topics/delete_topic.rs
index a41c0de02..45680311d 100644
--- a/core/common/src/commands/topics/delete_topic.rs
+++ b/core/common/src/commands/topics/delete_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/purge_topic.rs 
b/core/common/src/commands/topics/purge_topic.rs
index 061e374fa..9473b38c7 100644
--- a/core/common/src/commands/topics/purge_topic.rs
+++ b/core/common/src/commands/topics/purge_topic.rs
@@ -30,7 +30,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `stream_id` - unique stream ID (numeric or name).
 /// - `topic_id` - unique topic ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct PurgeTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/topics/update_topic.rs 
b/core/common/src/commands/topics/update_topic.rs
index 32a2ba7a5..83dadb69c 100644
--- a/core/common/src/commands/topics/update_topic.rs
+++ b/core/common/src/commands/topics/update_topic.rs
@@ -40,7 +40,7 @@ use std::str::from_utf8;
 ///   Can't be lower than segment size in the config.
 /// - `replication_factor` - replication factor for the topic.
 /// - `name` - unique topic name, max length is 255 characters.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct UpdateTopic {
     /// Unique stream ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/change_password.rs 
b/core/common/src/commands/users/change_password.rs
index 27f1ea6b8..4a5663778 100644
--- a/core/common/src/commands/users/change_password.rs
+++ b/core/common/src/commands/users/change_password.rs
@@ -33,7 +33,7 @@ use std::str::from_utf8;
 /// - `user_id` - unique user ID (numeric or name).
 /// - `current_password` - current password, must be between 3 and 100 
characters long.
 /// - `new_password` - new password, must be between 3 and 100 characters long.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct ChangePassword {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/create_user.rs 
b/core/common/src/commands/users/create_user.rs
index 03e0cb167..7d23c54ee 100644
--- a/core/common/src/commands/users/create_user.rs
+++ b/core/common/src/commands/users/create_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
 /// - `password` - password of the user, must be between 3 and 100 characters 
long.
 /// - `status` - status of the user, can be either `active` or `inactive`.
 /// - `permissions` - optional permissions of the user. If not provided, user 
will have no permissions.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
 pub struct CreateUser {
     /// Unique name of the user, must be between 3 and 50 characters long.
     pub username: String,
diff --git a/core/common/src/commands/users/delete_user.rs 
b/core/common/src/commands/users/delete_user.rs
index c41a967ff..3cc5682a6 100644
--- a/core/common/src/commands/users/delete_user.rs
+++ b/core/common/src/commands/users/delete_user.rs
@@ -28,7 +28,7 @@ use std::fmt::Display;
 /// `DeleteUser` command is used to delete a user by unique ID.
 /// It has additional payload:
 /// - `user_id` - unique user ID (numeric or name).
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct DeleteUser {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/update_permissions.rs 
b/core/common/src/commands/users/update_permissions.rs
index ab8e525f0..ff96927dd 100644
--- a/core/common/src/commands/users/update_permissions.rs
+++ b/core/common/src/commands/users/update_permissions.rs
@@ -31,7 +31,7 @@ use std::fmt::Display;
 /// It has additional payload:
 /// - `user_id` - unique user ID (numeric or name).
 /// - `permissions` - new permissions (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct UpdatePermissions {
     /// Unique user ID (numeric or name).
     #[serde(skip)]
diff --git a/core/common/src/commands/users/update_user.rs 
b/core/common/src/commands/users/update_user.rs
index 28e2504d2..3e2811915 100644
--- a/core/common/src/commands/users/update_user.rs
+++ b/core/common/src/commands/users/update_user.rs
@@ -34,7 +34,7 @@ use std::str::from_utf8;
 /// - `user_id` - unique user ID (numeric or name).
 /// - `username` - new username (optional), if provided, must be between 3 and 
50 characters long.
 /// - `status` - new status (optional)
-#[derive(Debug, Serialize, Deserialize, PartialEq, Default)]
+#[derive(Debug, Serialize, Deserialize, PartialEq, Default, Clone)]
 pub struct UpdateUser {
     #[serde(skip)]
     pub user_id: Identifier,
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 0e94820f8..ac32cc3c8 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -141,6 +141,22 @@ where
         }
     }
 
+    /// Get the message body as zero-copy `Bytes`.
+    ///
+    /// Returns an empty `Bytes` if there is no body.
+    #[inline]
+    #[allow(unused)]
+    pub fn body_bytes(&self) -> Bytes {
+        let header_size = size_of::<H>();
+        let total_size = self.header().size() as usize;
+
+        if total_size > header_size {
+            self.buffer.slice(header_size..total_size)
+        } else {
+            Bytes::new()
+        }
+    }
+
     /// Get the complete message as bytes (header + body).
     #[inline]
     #[allow(unused)]
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index ee886e7c8..76a2f64ea 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,10 +29,11 @@ readme = "../../../README.md"
 
 [dependencies]
 ahash = { workspace = true }
-bytes = { workspace = true }
 consensus = { path = "../consensus" }
 iggy_common = { path = "../common" }
 journal = { path = "../journal" }
+left-right = { workspace = true }
 message_bus = { path = "../message_bus" }
+paste = "1.0"
 slab = "0.4.11"
 tracing = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 33ae1da34..e1bee5f09 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,6 +14,8 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+use std::marker::PhantomData;
+
 use consensus::{Consensus, Project, Sequencer, Status, VsrConsensus};
 use iggy_common::{
     header::{Command2, PrepareHeader, PrepareOkHeader},
@@ -26,42 +28,74 @@ use tracing::{debug, warn};
 // TODO: Define a trait (probably in some external crate)
 #[expect(unused)]
 trait Metadata {
-    type Consensus: Consensus;
-    type Journal: Journal<Entry = <Self::Consensus as 
Consensus>::ReplicateMessage>;
-
     /// Handle a replicate message (Prepare in VSR).
-    fn on_request(&self, message: <Self::Consensus as 
Consensus>::RequestMessage);
+    fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage);
 
     /// Handle an ack message (PrepareOk in VSR).
     fn on_replicate(
         &self,
-        message: <Self::Consensus as Consensus>::ReplicateMessage,
+        message: <VsrConsensus as Consensus>::ReplicateMessage,
     ) -> impl Future<Output = ()>;
-    fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage);
+    fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage);
 }
 
-#[expect(unused)]
-struct IggyMetadata<M, J, S> {
-    consensus: VsrConsensus,
-    mux_stm: M,
-    journal: J,
-    snapshot: S,
+pub trait MetadataHandle {
+    type Consensus;
+    type Journal;
+    type Snapshot;
+    type StateMachine;
 }
 
-impl<M, J, S> Metadata for IggyMetadata<M, J, S>
+/// Concrete implementation of `MetadataHandle` for Iggy.
+/// This is a marker struct that only holds type information.
+pub struct IggyMetadataHandle<J, S, M> {
+    _marker: PhantomData<(J, S, M)>,
+}
+
+impl<J, S, M> MetadataHandle for IggyMetadataHandle<J, S, M>
 where
     J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
 {
     type Consensus = VsrConsensus;
     type Journal = J;
-    fn on_request(&self, message: <Self::Consensus as 
Consensus>::RequestMessage) {
+    type Snapshot = S;
+    type StateMachine = M;
+}
+
+// 
=============================================================================
+// IggyMetadata
+// 
=============================================================================
+
+#[expect(unused)]
+struct IggyMetadata<H: MetadataHandle> {
+    /// Some on shard0, None on other shards
+    consensus: Option<H::Consensus>,
+    /// Some on shard0, None on other shards
+    journal: Option<H::Journal>,
+    /// Some on shard0, None on other shards
+    snapshot: Option<H::Snapshot>,
+    /// State machine - lives on all shards
+    mux_stm: H::StateMachine,
+}
+
+// TODO: Handle the `routing` of messages to shard0, on the callsite.
+impl<J, S, M> Metadata for IggyMetadata<IggyMetadataHandle<J, S, M>>
+where
+    J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
+{
+    fn on_request(&self, message: <VsrConsensus as Consensus>::RequestMessage) 
{
+        let consensus = self.consensus.as_ref().unwrap();
+
         // TODO: Bunch of asserts.
         debug!("handling metadata request");
-        let prepare = message.project(&self.consensus);
+        let prepare = message.project(consensus);
         self.pipeline_prepare(prepare);
     }
 
-    async fn on_replicate(&self, message: <Self::Consensus as 
Consensus>::ReplicateMessage) {
+    async fn on_replicate(&self, message: <VsrConsensus as 
Consensus>::ReplicateMessage) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
+
         let header = message.header();
 
         assert_eq!(header.command, Command2::Prepare);
@@ -73,24 +107,24 @@ where
         }
 
         // If syncing, ignore the replicate message.
-        if self.consensus.is_syncing() {
+        if consensus.is_syncing() {
             warn!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "on_replicate: ignoring (sync)"
             );
             return;
         }
 
-        let current_op = self.consensus.sequencer().current_sequence();
+        let current_op = consensus.sequencer().current_sequence();
 
         // Old message (handle as repair). Not replicating.
-        if header.view < self.consensus.view()
-            || (self.consensus.status() == Status::Normal
-                && header.view == self.consensus.view()
+        if header.view < consensus.view()
+            || (consensus.status() == Status::Normal
+                && header.view == consensus.view()
                 && header.op <= current_op)
         {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "on_replicate: ignoring (repair)"
             );
             self.on_repair(message);
@@ -98,18 +132,18 @@ where
         }
 
         // If status is not normal, ignore the replicate.
-        if self.consensus.status() != Status::Normal {
+        if consensus.status() != Status::Normal {
             warn!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "on_replicate: ignoring (not normal state)"
             );
             return;
         }
 
         //if message from future view, we ignore the replicate.
-        if header.view > self.consensus.view() {
+        if header.view > consensus.view() {
             warn!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "on_replicate: ignoring (newer view)"
             );
             return;
@@ -118,9 +152,8 @@ where
         // TODO add assertions for valid state here.
 
         // If we are a follower, we advance the commit number.
-        if self.consensus.is_follower() {
-            self.consensus
-                .advance_commit_number(message.header().commit);
+        if consensus.is_follower() {
+            consensus.advance_commit_number(message.header().commit);
         }
 
         // TODO verify that the current prepare fits in the WAL.
@@ -128,46 +161,43 @@ where
         // TODO handle gap in ops.
 
         // Verify hash chain integrity.
-        if let Some(previous) = self.journal.previous_entry(header) {
+        if let Some(previous) = journal.previous_entry(header) {
             self.panic_if_hash_chain_would_break_in_same_view(&previous, 
header);
         }
 
         assert_eq!(header.op, current_op + 1);
 
-        self.consensus.sequencer().set_sequence(header.op);
-        self.journal.set_header_as_dirty(header);
+        consensus.sequencer().set_sequence(header.op);
+        journal.set_header_as_dirty(header);
 
         // Append to journal.
-        self.journal.append(message.clone()).await;
+        journal.append(message.clone()).await;
 
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
 
         // If follower, commit any newly committable entries.
-        if self.consensus.is_follower() {
+        if consensus.is_follower() {
             self.commit_journal();
         }
     }
 
-    fn on_ack(&self, message: <Self::Consensus as Consensus>::AckMessage) {
+    fn on_ack(&self, message: <VsrConsensus as Consensus>::AckMessage) {
+        let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
 
-        if !self.consensus.is_primary() {
+        if !consensus.is_primary() {
             warn!("on_ack: ignoring (not primary)");
             return;
         }
 
-        if self.consensus.status() != Status::Normal {
+        if consensus.status() != Status::Normal {
             warn!("on_ack: ignoring (not normal)");
             return;
         }
 
         // Find the prepare in pipeline
-        let Some(mut pipeline) = 
self.consensus.pipeline().try_borrow_mut().ok() else {
-            warn!("on_ack: could not borrow pipeline (already mutably 
borrowed)");
-            return;
-        };
-
+        let mut pipeline = consensus.pipeline().borrow_mut();
         let Some(entry) = pipeline.message_by_op_and_checksum(header.op, 
header.prepare_checksum)
         else {
             debug!("on_ack: prepare not in pipeline op={}", header.op);
@@ -184,34 +214,40 @@ where
         let count = entry.add_ack(header.replica);
 
         // Check quorum
-        if count >= self.consensus.quorum() && !entry.ok_quorum_received {
+        if count >= consensus.quorum() && !entry.ok_quorum_received {
             entry.ok_quorum_received = true;
             debug!("on_ack: quorum received for op={}", header.op);
 
             // Advance commit number and trigger commit journal
-            self.consensus.advance_commit_number(header.op);
+            consensus.advance_commit_number(header.op);
             self.commit_journal();
         }
     }
 }
 
-impl<M, J, S> IggyMetadata<M, J, S>
+impl<J, S, M> IggyMetadata<IggyMetadataHandle<J, S, M>>
 where
     J: Journal<Entry = <VsrConsensus as Consensus>::ReplicateMessage, Header = 
PrepareHeader>,
 {
     #[expect(unused)]
     fn pipeline_prepare(&self, prepare: Message<PrepareHeader>) {
+        let consensus = self.consensus.as_ref().unwrap();
+
         debug!("inserting prepare into metadata pipeline");
-        self.consensus.verify_pipeline();
-        self.consensus.pipeline_message(prepare.clone());
+        consensus.verify_pipeline();
+        consensus.pipeline_message(prepare.clone());
 
         self.on_replicate(prepare.clone());
-        self.consensus.post_replicate_verify(&prepare);
+        consensus.post_replicate_verify(&prepare);
     }
 
     fn fence_old_prepare(&self, prepare: &Message<PrepareHeader>) -> bool {
+        let (Some(consensus), Some(journal)) = (&self.consensus, 
&self.journal) else {
+            todo!("dispatch fence_old_prepare to shard0");
+        };
+
         let header = prepare.header();
-        header.op <= self.consensus.commit() || 
self.journal.has_prepare(header)
+        header.op <= consensus.commit() || journal.has_prepare(header)
     }
 
     /// Replicate a prepare message to the next replica in the chain.
@@ -221,38 +257,41 @@ where
     /// - Each backup forwards to the next
     /// - Stops when we would forward back to primary
     async fn replicate(&self, message: Message<PrepareHeader>) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
+
         let header = message.header();
 
         assert_eq!(header.command, Command2::Prepare);
         assert!(
-            !self.journal.has_prepare(header),
+            !journal.has_prepare(header),
             "replicate: must not already have prepare"
         );
-        assert!(header.op > self.consensus.commit());
+        assert!(header.op > consensus.commit());
 
-        let next = (self.consensus.replica() + 1) % 
self.consensus.replica_count();
+        let next = (consensus.replica() + 1) % consensus.replica_count();
 
-        let primary = self.consensus.primary_index(header.view);
+        let primary = consensus.primary_index(header.view);
         if next == primary {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 op = header.op,
                 "replicate: not replicating (ring complete)"
             );
             return;
         }
 
-        assert_ne!(next, self.consensus.replica());
+        assert_ne!(next, consensus.replica());
 
         debug!(
-            replica = self.consensus.replica(),
+            replica = consensus.replica(),
             to = next,
             op = header.op,
             "replicate: forwarding"
         );
 
         let message = message.into_generic();
-        self.consensus
+        consensus
             .message_bus()
             .send_to_replica(next, message)
             .await
@@ -292,29 +331,32 @@ where
     /// Send a prepare_ok message to the primary.
     /// Called after successfully writing a prepare to the journal.
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
+        let consensus = self.consensus.as_ref().unwrap();
+        let journal = self.journal.as_ref().unwrap();
+
         assert_eq!(header.command, Command2::Prepare);
 
-        if self.consensus.status() != Status::Normal {
+        if consensus.status() != Status::Normal {
             debug!(
-                replica = self.consensus.replica(),
-                status = ?self.consensus.status(),
+                replica = consensus.replica(),
+                status = ?consensus.status(),
                 "send_prepare_ok: not sending (not normal)"
             );
             return;
         }
 
-        if self.consensus.is_syncing() {
+        if consensus.is_syncing() {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "send_prepare_ok: not sending (syncing)"
             );
             return;
         }
 
         // Verify we have the prepare and it's persisted (not dirty).
-        if !self.journal.has_prepare(header) {
+        if !journal.has_prepare(header) {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 op = header.op,
                 "send_prepare_ok: not sending (not persisted or missing)"
             );
@@ -322,24 +364,24 @@ where
         }
 
         assert!(
-            header.view <= self.consensus.view(),
+            header.view <= consensus.view(),
             "send_prepare_ok: prepare view {} > our view {}",
             header.view,
-            self.consensus.view()
+            consensus.view()
         );
 
-        if header.op > self.consensus.sequencer().current_sequence() {
+        if header.op > consensus.sequencer().current_sequence() {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 op = header.op,
-                our_op = self.consensus.sequencer().current_sequence(),
+                our_op = consensus.sequencer().current_sequence(),
                 "send_prepare_ok: not sending (op ahead)"
             );
             return;
         }
 
         debug!(
-            replica = self.consensus.replica(),
+            replica = consensus.replica(),
             op = header.op,
             checksum = header.checksum,
             "send_prepare_ok: sending"
@@ -348,12 +390,12 @@ where
         // Use current view, not the prepare's view.
         let prepare_ok_header = PrepareOkHeader {
             command: Command2::PrepareOk,
-            cluster: self.consensus.cluster(),
-            replica: self.consensus.replica(),
-            view: self.consensus.view(),
+            cluster: consensus.cluster(),
+            replica: consensus.replica(),
+            view: consensus.view(),
             epoch: header.epoch,
             op: header.op,
-            commit: self.consensus.commit(),
+            commit: consensus.commit(),
             timestamp: header.timestamp,
             parent: header.parent,
             prepare_checksum: header.checksum,
@@ -367,23 +409,23 @@ where
             
Message::<PrepareOkHeader>::new(std::mem::size_of::<PrepareOkHeader>())
                 .transmute_header(|_, new| *new = prepare_ok_header);
         let generic_message = message.into_generic();
-        let primary = self.consensus.primary_index(self.consensus.view());
+        let primary = consensus.primary_index(consensus.view());
 
-        if primary == self.consensus.replica() {
+        if primary == consensus.replica() {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 "send_prepare_ok: loopback to self"
             );
             // TODO: Queue for self-processing or call handle_prepare_ok 
directly
         } else {
             debug!(
-                replica = self.consensus.replica(),
+                replica = consensus.replica(),
                 to = primary,
                 op = header.op,
                 "send_prepare_ok: sending to primary"
             );
 
-            self.consensus
+            consensus
                 .message_bus()
                 .send_to_replica(primary, generic_message)
                 .await
@@ -391,21 +433,3 @@ where
         }
     }
 }
-
-// TODO: Hide with associated types all of those generics, so they are not 
leaking to the upper layer, or maybe even make of the `Metadata` trait itself.
-// Something like this:
-// pub trait MetadataHandle {
-//     type Consensus: Consensus<Self::Clock>;
-//     type Clock: Clock;
-//     type MuxStm;
-//     type Journal;
-//     type Snapshot;
-// }
-
-// pub trait Metadata<H: MetadataHandle> {
-//     fn on_request(&self, message: <H::Consensus as 
Consensus<H::Clock>>::RequestMessage); // Create type aliases for those long 
associated types
-//     fn on_replicate(&self, message: <H::Consensus as 
Consensus<H::Clock>>::ReplicateMessage);
-//     fn on_ack(&self, message: <H::Consensus as 
Consensus<H::Clock>>::AckMessage);
-// }
-
-// The error messages can get ugly from those associated types, but I think 
it's worth the fact that it hides a lot of the generics and their bounds.
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 944627ad7..3a3e947fc 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,215 +15,212 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::{ApplyState, StateCommand};
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
-use bytes::Bytes;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
 use iggy_common::delete_consumer_group::DeleteConsumerGroup;
-use iggy_common::{
-    BytesSerializable, IggyTimestamp,
-    header::{Operation, PrepareHeader},
-    message::Message,
-};
+use iggy_common::{IdKind, Identifier};
 use slab::Slab;
-use std::cell::RefCell;
-
-// ============================================================================
-// ConsumerGroupMember - Individual member of a consumer group
-// ============================================================================
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
 
 #[derive(Debug, Clone)]
 pub struct ConsumerGroupMember {
-    pub id: u32,
-    pub joined_at: IggyTimestamp,
+    pub id: usize,
+    pub client_id: u32,
+    pub partitions: Vec<usize>,
+    pub partition_index: Arc<AtomicUsize>,
 }
 
 impl ConsumerGroupMember {
-    pub fn new(id: u32, joined_at: IggyTimestamp) -> Self {
-        Self { id, joined_at }
+    pub fn new(id: usize, client_id: u32) -> Self {
+        Self {
+            id,
+            client_id,
+            partitions: Vec::new(),
+            partition_index: Arc::new(AtomicUsize::new(0)),
+        }
     }
 }
 
-// ============================================================================
-// ConsumerGroup - A group of consumers
-// ============================================================================
-
 #[derive(Debug, Clone)]
 pub struct ConsumerGroup {
     pub id: usize,
-    pub stream_id: usize,
-    pub topic_id: usize,
-    pub name: String,
-    pub created_at: IggyTimestamp,
-    pub members: Vec<ConsumerGroupMember>,
+    pub name: Arc<str>,
+    pub partitions: Vec<usize>,
+    pub members: Slab<ConsumerGroupMember>,
 }
 
 impl ConsumerGroup {
-    pub fn new(stream_id: usize, topic_id: usize, name: String, created_at: 
IggyTimestamp) -> Self {
+    pub fn new(name: Arc<str>) -> Self {
         Self {
             id: 0,
-            stream_id,
-            topic_id,
             name,
-            created_at,
-            members: Vec::new(),
+            partitions: Vec::new(),
+            members: Slab::new(),
         }
     }
 
-    pub fn add_member(&mut self, member: ConsumerGroupMember) {
-        self.members.push(member);
-    }
+    pub fn rebalance_members(&mut self) {
+        let partition_count = self.partitions.len();
+        let member_count = self.members.len();
 
-    pub fn remove_member(&mut self, member_id: u32) -> 
Option<ConsumerGroupMember> {
-        if let Some(pos) = self.members.iter().position(|m| m.id == member_id) 
{
-            Some(self.members.remove(pos))
-        } else {
-            None
+        if member_count == 0 || partition_count == 0 {
+            return;
         }
-    }
-
-    pub fn members_count(&self) -> usize {
-        self.members.len()
-    }
-}
-
-// ============================================================================
-// ConsumerGroups Collection
-// ============================================================================
 
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
-    // Global index for all consumer groups across all streams/topics
-    index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id, 
topic_id, name) -> id
-    items: RefCell<Slab<ConsumerGroup>>,
-}
-
-impl ConsumerGroups {
-    pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
+        let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
+        for &member_id in &member_ids {
+            if let Some(member) = self.members.get_mut(member_id) {
+                member.partitions.clear();
+            }
         }
-    }
-
-    /// Insert a consumer group and return the assigned ID
-    pub fn insert(&self, group: ConsumerGroup) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
 
-        let key = (group.stream_id, group.topic_id, group.name.clone());
-        let id = items.insert(group);
-        items[id].id = id;
-        index.insert(key, id);
-        id
+        for (i, &partition_id) in self.partitions.iter().enumerate() {
+            let member_idx = i % member_count;
+            if let Some(&member_id) = member_ids.get(member_idx)
+                && let Some(member) = self.members.get_mut(member_id)
+            {
+                member.partitions.push(partition_id);
+            }
+        }
     }
+}
 
-    /// Get consumer group by ID
-    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
-        self.items.borrow().get(id).cloned()
-    }
+define_state! {
+    ConsumerGroups {
+        name_index: AHashMap<Arc<str>, usize>,
+        topic_index: AHashMap<(usize, usize), Vec<usize>>,
+        topic_name_index: AHashMap<(Arc<str>, Arc<str>), Vec<usize>>,
+        items: Slab<ConsumerGroup>,
+    },
+    [CreateConsumerGroup, DeleteConsumerGroup]
+}
+impl_absorb!(ConsumerGroupsInner, ConsumerGroupsCommand);
 
-    /// Get consumer group by stream_id, topic_id, and name
-    pub fn get_by_location(
+impl ConsumerGroupsInner {
+    fn resolve_consumer_group_id_by_identifiers(
         &self,
-        stream_id: usize,
-        topic_id: usize,
-        name: &str,
-    ) -> Option<ConsumerGroup> {
-        let index = self.index.borrow();
-        let key = (stream_id, topic_id, name.to_string());
-        if let Some(&id) = index.get(&key) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        group_id: &Identifier,
+    ) -> Option<usize> {
+        // Resolve by numeric IDs
+        if let (Ok(s), Ok(t)) = (stream_id.get_u32_value(), 
topic_id.get_u32_value()) {
+            let groups_in_topic = self.topic_index.get(&(s as usize, t as 
usize))?;
+
+            return match group_id.kind {
+                IdKind::Numeric => {
+                    let g_id = group_id.get_u32_value().ok()? as usize;
+                    groups_in_topic.contains(&g_id).then_some(g_id)
+                }
+                IdKind::String => {
+                    let g_name = group_id.get_string_value().ok()?;
+                    groups_in_topic
+                        .iter()
+                        .find(|&&id| {
+                            self.items
+                                .get(id)
+                                .is_some_and(|g| g.name.as_ref() == g_name)
+                        })
+                        .copied()
+                }
+            };
         }
-    }
 
-    /// Remove consumer group by ID
-    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
+        // Resolve by string names
+        if let (Ok(s), Ok(t)) = (stream_id.get_string_value(), 
topic_id.get_string_value()) {
+            let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+            let groups_in_topic = self.topic_name_index.get(&key)?;
 
-        if !items.contains(id) {
-            return None;
+            return match group_id.kind {
+                IdKind::Numeric => {
+                    let g_id = group_id.get_u32_value().ok()? as usize;
+                    groups_in_topic.contains(&g_id).then_some(g_id)
+                }
+                IdKind::String => {
+                    let g_name = group_id.get_string_value().ok()?;
+                    groups_in_topic
+                        .iter()
+                        .find(|&&id| {
+                            self.items
+                                .get(id)
+                                .is_some_and(|g| g.name.as_ref() == g_name)
+                        })
+                        .copied()
+                }
+            };
         }
 
-        let group = items.remove(id);
-        let key = (group.stream_id, group.topic_id, group.name.clone());
-        index.remove(&key);
-        Some(group)
+        None
     }
+}
 
-    /// Get all consumer groups for a specific topic
-    pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) -> 
Vec<ConsumerGroup> {
-        self.items
-            .borrow()
-            .iter()
-            .filter_map(|(_, g)| {
-                if g.stream_id == stream_id && g.topic_id == topic_id {
-                    Some(g.clone())
-                } else {
-                    None
+impl Handler for ConsumerGroupsInner {
+    fn handle(&mut self, cmd: &Self::Cmd) {
+        // TODO: This is all an hack, we need to figure out how to do this, in 
a way where `Identifier` does not reach
+        // this stage of execution.
+        match cmd {
+            ConsumerGroupsCommand::CreateConsumerGroup(payload) => {
+                let name: Arc<str> = Arc::from(payload.name.as_str());
+                if self.name_index.contains_key(&name) {
+                    return;
                 }
-            })
-            .collect()
-    }
-
-    /// Get number of consumer groups
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
 
-    /// Check if empty
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
+                let group = ConsumerGroup::new(name.clone());
+                let id = self.items.insert(group);
+                self.items[id].id = id;
 
-    /// Get all consumer groups
-    pub fn values(&self) -> Vec<ConsumerGroup> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, g): (usize, &ConsumerGroup)| g.clone())
-            .collect()
-    }
-}
+                self.name_index.insert(name.clone(), id);
 
-#[derive(Debug)]
-pub enum ConsumerGroupsCommand {
-    Create(CreateConsumerGroup),
-    Delete(DeleteConsumerGroup),
-}
-
-impl StateCommand for ConsumerGroups {
-    type Command = ConsumerGroupsCommand;
-    type Input = Message<PrepareHeader>;
-
-    fn into_command(input: &Self::Input) -> Option<Self::Command> {
-        // TODO: rework this thing, so we don't copy the bytes on each request
-        let body = Bytes::copy_from_slice(input.body());
-        match input.header().operation {
-            Operation::CreateConsumerGroup => 
Some(ConsumerGroupsCommand::Create(
-                CreateConsumerGroup::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::DeleteConsumerGroup => 
Some(ConsumerGroupsCommand::Delete(
-                DeleteConsumerGroup::from_bytes(body.clone()).unwrap(),
-            )),
-            _ => None,
-        }
-    }
-}
-
-impl ApplyState for ConsumerGroups {
-    type Output = ();
+                if let (Ok(s), Ok(t)) = (
+                    payload.stream_id.get_u32_value(),
+                    payload.topic_id.get_u32_value(),
+                ) {
+                    self.topic_index
+                        .entry((s as usize, t as usize))
+                        .or_default()
+                        .push(id);
+                }
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            ConsumerGroupsCommand::Create(payload) => {
-                todo!("Handle Create consumer group with {:?}", payload)
+                if let (Ok(s), Ok(t)) = (
+                    payload.stream_id.get_string_value(),
+                    payload.topic_id.get_string_value(),
+                ) {
+                    let key = (Arc::from(s.as_str()), Arc::from(t.as_str()));
+                    self.topic_name_index.entry(key).or_default().push(id);
+                }
             }
-            ConsumerGroupsCommand::Delete(payload) => {
-                todo!("Handle Delete consumer group with {:?}", payload)
+
+            ConsumerGroupsCommand::DeleteConsumerGroup(payload) => {
+                if let Some(id) = 
self.resolve_consumer_group_id_by_identifiers(
+                    &payload.stream_id,
+                    &payload.topic_id,
+                    &payload.group_id,
+                ) {
+                    let group = self.items.remove(id);
+
+                    self.name_index.remove(&group.name);
+
+                    if let (Ok(s), Ok(t)) = (
+                        payload.stream_id.get_u32_value(),
+                        payload.topic_id.get_u32_value(),
+                    ) && let Some(vec) = self.topic_index.get_mut(&(s as 
usize, t as usize))
+                    {
+                        vec.retain(|&x| x != id);
+                    }
+
+                    if let (Ok(s), Ok(t)) = (
+                        payload.stream_id.get_string_value(),
+                        payload.topic_id.get_string_value(),
+                    ) {
+                        let key = (Arc::from(s.as_str()), 
Arc::from(t.as_str()));
+                        if let Some(vec) = self.topic_name_index.get_mut(&key) 
{
+                            vec.retain(|&x| x != id);
+                        }
+                    }
+                }
             }
         }
     }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e6c13684b..7f3baa0c8 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -1,4 +1,3 @@
-// Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -20,109 +19,254 @@ pub mod mux;
 pub mod stream;
 pub mod user;
 
-/// Macro to generate a `{State}Command` enum and implement `StateCommand` 
trait.
-///
-/// # Arguments
-/// * `$state_type` - The type that implements `ApplyState` trait
-/// * `$command_enum` - The name of the command enum to generate (e.g., 
StreamsCommand)
-/// * `$operations` - Array of Operation enum variants (also used as payload 
type names)
-///
-/// # Example
-/// ```ignore
-/// define_state_command! {
-///     Streams,
-///     StreamsCommand,
-///     [CreateStream, UpdateStream, DeleteStream, PurgeStream]
-/// }
-/// ```
-#[macro_export]
-macro_rules! define_state_command {
-    (
-        $state_type:ty,
-        $command_enum:ident,
-        [$($operation:ident),* $(,)?]
-    ) => {
-        #[derive(Debug)]
-        pub enum $command_enum {
-            $(
-                $operation($operation),
-            )*
-        }
+use left_right::*;
+use std::cell::UnsafeCell;
+use std::sync::Arc;
 
-        impl $crate::stm::StateCommand for $state_type {
-            type Command = $command_enum;
-            type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
-
-            fn into_command(input: &Self::Input) -> Option<Self::Command> {
-                use ::iggy_common::BytesSerializable;
-                use ::bytes::Bytes;
-                use ::iggy_common::header::Operation;
-
-                // TODO: rework this thing, so we don't copy the bytes on each 
request
-                let body = Bytes::copy_from_slice(input.body());
-                match input.header().operation {
-                    $(
-                        Operation::$operation => {
-                            Some($command_enum::$operation(
-                                $operation::from_bytes(body.clone()).unwrap()
-                            ))
-                        },
-                    )*
-                    _ => None,
-                }
-            }
+pub struct WriteCell<T, O>
+where
+    T: Absorb<O>,
+{
+    inner: UnsafeCell<WriteHandle<T, O>>,
+}
+
+impl<T, O> WriteCell<T, O>
+where
+    T: Absorb<O>,
+{
+    pub fn new(write: WriteHandle<T, O>) -> Self {
+        Self {
+            inner: UnsafeCell::new(write),
         }
+    }
 
-        // Compile-time check that the type implements ApplyState
-        const _: () = {
-            const fn assert_impl_apply_state<T: $crate::stm::ApplyState>() {}
-            assert_impl_apply_state::<$state_type>();
+    pub fn apply(&self, cmd: O) {
+        let hdl = unsafe {
+            self.inner
+                .get()
+                .as_mut()
+                .expect("[apply]: called on uninit writer, for cmd: {cmd}")
         };
-    };
+        hdl.append(cmd).publish();
+    }
 }
 
-// This is public interface to state, therefore it will be imported from 
different crate, for now during development I am leaving it there.
-pub trait State
+/// Parses type-erased input into a command. Macro-generated.
+pub trait Command {
+    type Cmd;
+    type Input;
+
+    fn parse(input: &Self::Input) -> Option<Self::Cmd>;
+}
+
+/// Handles commands. User-implemented business logic.
+pub trait Handler: Command {
+    fn handle(&mut self, cmd: &Self::Cmd);
+}
+
+pub struct LeftRight<T, C>
+where
+    T: Absorb<C>,
+{
+    write: Option<WriteCell<T, C>>,
+    #[allow(unused)]
+    read: Arc<ReadHandle<T>>,
+}
+
+impl<T> From<T> for LeftRight<T, <T as Command>::Cmd>
 where
-    Self: Sized,
+    T: Absorb<<T as Command>::Cmd> + Clone + Command,
 {
+    fn from(inner: T) -> Self {
+        let (write, read) = {
+            let (w, r) = left_right::new_from_empty(inner);
+            (WriteCell::new(w).into(), r.into())
+        };
+        Self { write, read }
+    }
+}
+
+impl<T> LeftRight<T, <T as Command>::Cmd>
+where
+    T: Absorb<<T as Command>::Cmd> + Clone + Handler,
+{
+    pub fn do_apply(&self, cmd: <T as Command>::Cmd) {
+        self.write
+            .as_ref()
+            .expect("no write handle - not the owner shard")
+            .apply(cmd);
+    }
+}
+
+/// Public interface for state machines.
+pub trait State {
     type Output;
     type Input;
 
-    // Apply the state machine logic and return an optional output.
-    // The output is optional, as we model the `StateMachine`, as an variadic 
list,
-    // where not all state machines will produce an output for every input 
event.
     fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
 }
 
-// TODO: This interface should be private to the stm module.
 pub trait StateMachine {
     type Input;
     type Output;
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
 }
 
-pub trait StateCommand {
-    type Command;
-    type Input;
+/// Generates a state machine with convention-based storage.
+///
+/// # Generated items
+/// - `{$state}Inner` struct with the specified fields (the data)
+/// - `{$state}Command` enum with variants for each operation
+/// - `$state` wrapper struct (non-generic, contains LeftRight storage)
+/// - `Command` impl for `{$state}Inner` (parsing)
+/// - `State` impl for `$state`
+/// - `From<LeftRight<...>>` impl for `$state`
+///
+/// # User must implement
+/// - `Handler` for `{$state}Inner` (business logic)
+/// - `impl_absorb!` for `{$state}Inner` and `{$state}Command`
+///
+/// # Example
+/// ```ignore
+/// define_state! {
+///     Streams {
+///         index: AHashMap<String, usize>,
+///         items: Slab<Stream>,
+///     },
+///     [CreateStream, UpdateStream, DeleteStream]
+/// }
+///
+/// // User implements Handler manually:
+/// impl Handler for StreamsInner {
+///     fn handle(&mut self, cmd: &StreamsCommand) {
+///         match cmd {
+///             StreamsCommand::CreateStream(payload) => { /* ... */ }
+///             // ...
+///         }
+///     }
+/// }
+///
+/// // User implements Absorb via macro:
+/// impl_absorb!(StreamsInner, StreamsCommand);
+/// ```
+// TODO: The `operation` argument can be removed, once we create an trait for 
mapping.
+#[macro_export]
+macro_rules! define_state {
+    (
+        $state:ident {
+            $($field_name:ident : $field_type:ty),* $(,)?
+        },
+        [$($operation:ident),* $(,)?]
+    ) => {
+        paste::paste! {
+            #[derive(Debug, Clone, Default)]
+            pub struct [<$state Inner>] {
+                $(
+                    pub $field_name: $field_type,
+                )*
+            }
 
-    fn into_command(input: &Self::Input) -> Option<Self::Command>;
-}
+            impl [<$state Inner>] {
+                pub fn new() -> Self {
+                    Self::default()
+                }
+            }
 
-pub trait ApplyState: StateCommand {
-    type Output;
+            #[derive(Debug, Clone)]
+            pub enum [<$state Command>] {
+                $(
+                    $operation($operation),
+                )*
+            }
+
+            pub struct $state {
+                inner: $crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>,
+            }
+
+            impl From<$crate::stm::LeftRight<[<$state Inner>], [<$state 
Command>]>> for $state {
+                fn from(inner: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]>) -> Self {
+                    Self { inner }
+                }
+            }
+
+            impl From<[<$state Inner>]> for $state {
+                fn from(inner: [<$state Inner>]) -> Self {
+                    let left_right: $crate::stm::LeftRight<[<$state Inner>], 
[<$state Command>]> = inner.into();
+                    left_right.into()
+                }
+            }
+
+            impl $crate::stm::State for $state {
+                type Input = <[<$state Inner>] as $crate::stm::Command>::Input;
+                type Output = ();
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output;
+                fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+                    <[<$state Inner>] as $crate::stm::Command>::parse(input)
+                        .map(|cmd| self.inner.do_apply(cmd))
+                }
+            }
+
+            impl $crate::stm::Command for [<$state Inner>] {
+                type Cmd = [<$state Command>];
+                type Input = 
::iggy_common::message::Message<::iggy_common::header::PrepareHeader>;
+
+                fn parse(input: &Self::Input) -> Option<Self::Cmd> {
+                    use ::iggy_common::BytesSerializable;
+                    use ::iggy_common::header::Operation;
+
+                    let body = input.body_bytes();
+                    match input.header().operation {
+                        $(
+                            Operation::$operation => {
+                                Some([<$state Command>]::$operation(
+                                    $operation::from_bytes(body).unwrap()
+                                ))
+                            },
+                        )*
+                        _ => None,
+                    }
+                }
+            }
+        }
+    };
 }
 
-impl<T> State for T
-where
-    T: ApplyState,
-{
-    type Output = T::Output;
-    type Input = T::Input;
+// This macro is really sad, but we can't do blanket impl from below, due to 
orphan rule.
+// impl<T> Absorb<T::Cmd> for T
+// where
+//     T: Handler + Clone,
+// {
+//     fn absorb_first(&mut self, cmd: &mut T::Cmd, _other: &Self) {
+//         self.handle(cmd);
 
-    fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
-        T::into_command(input).map(|cmd| self.do_apply(cmd))
-    }
+//     }
+
+//     fn absorb_second(&mut self, cmd: T::Cmd, _other: &Self) {
+//         self.handle(&cmd);
+//     }
+
+//     fn sync_with(&mut self, first: &Self) {
+//         *self = first.clone();
+//     }
+
+//     fn drop_first(self: Box<Self>) {}
+//     fn drop_second(self: Box<Self>) {}
+// }
+#[macro_export]
+macro_rules! impl_absorb {
+    ($inner:ident, $cmd:ident) => {
+        impl left_right::Absorb<$cmd> for $inner {
+            fn absorb_first(&mut self, cmd: &mut $cmd, _other: &Self) {
+                self.handle(cmd);
+            }
+
+            fn absorb_second(&mut self, cmd: $cmd, _other: &Self) {
+                self.handle(&cmd);
+            }
+
+            fn sync_with(&mut self, first: &Self) {
+                *self = first.clone();
+            }
+        }
+    };
 }
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 3fc80d88b..979ff3ccf 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -92,14 +92,16 @@ mod tests {
 
     #[test]
     fn construct_mux_state_machine_from_states_with_same_output() {
-        use crate::stm::*;
+        use crate::stm::StateMachine;
+        use crate::stm::mux::MuxStateMachine;
+        use crate::stm::stream::{Streams, StreamsInner};
+        use crate::stm::user::{Users, UsersInner};
         use iggy_common::header::PrepareHeader;
         use iggy_common::message::Message;
 
-        let users = user::Users::new();
-        let streams = stream::Streams::new();
-        let cgs = consumer_group::ConsumerGroups::new();
-        let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+        let users: Users = UsersInner::new().into();
+        let streams: Streams = StreamsInner::new().into();
+        let mux = MuxStateMachine::new(variadic!(users, streams));
 
         let input = Message::new(std::mem::size_of::<PrepareHeader>());
         let mut output = Vec::new();
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 6f05e5aa4..5106d6122 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,169 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::define_state_command;
 use crate::stats::{StreamStats, TopicStats};
-use crate::stm::ApplyState;
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::create_topic::CreateTopic;
 use iggy_common::delete_partitions::DeletePartitions;
-use iggy_common::delete_segments::DeleteSegments;
 use iggy_common::delete_stream::DeleteStream;
 use iggy_common::delete_topic::DeleteTopic;
 use iggy_common::purge_stream::PurgeStream;
 use iggy_common::purge_topic::PurgeTopic;
 use iggy_common::update_stream::UpdateStream;
 use iggy_common::update_topic::UpdateTopic;
-use iggy_common::{
-    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
-};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use slab::Slab;
-use std::cell::RefCell;
 use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
 
 #[derive(Debug, Clone)]
 pub struct Partition {
     pub id: usize,
-}
-
-impl Partition {
-    pub fn new(id: usize) -> Self {
-        Self { id }
-    }
-}
-
-#[derive(Debug, Clone, Default)]
-pub struct Partitions {
-    items: RefCell<Slab<Partition>>,
-}
-
-impl Partitions {
-    pub fn new() -> Self {
-        Self {
-            items: RefCell::new(Slab::with_capacity(1024)),
-        }
-    }
-
-    pub fn insert(&self, partition: Partition) -> usize {
-        let mut items = self.items.borrow_mut();
-        let id = items.insert(partition);
-        items[id].id = id;
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Partition> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn remove(&self, id: usize) -> Option<Partition> {
-        let mut items = self.items.borrow_mut();
-        if items.contains(id) {
-            Some(items.remove(id))
-        } else {
-            None
-        }
-    }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    pub fn iter(&self) -> Vec<Partition> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, p): (usize, &Partition)| p.clone())
-            .collect()
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct ConsumerGroup {
-    pub id: usize,
-    pub name: String,
     pub created_at: IggyTimestamp,
 }
 
-impl ConsumerGroup {
-    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
-        Self {
-            id: 0,
-            name,
-            created_at,
-        }
-    }
-}
-
-#[derive(Debug, Clone, Default)]
-pub struct ConsumerGroups {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<ConsumerGroup>>,
-}
-
-impl ConsumerGroups {
-    pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
-        }
-    }
-
-    pub fn insert(&self, group: ConsumerGroup) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = group.name.clone();
-        let id = items.insert(group);
-        items[id].id = id;
-        index.insert(name, id);
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
-
-    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
-        }
-
-        let group = items.remove(id);
-        index.remove(&group.name);
-        Some(group)
-    }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
+impl Partition {
+    pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+        Self { id, created_at }
     }
 }
 
 #[derive(Debug, Clone)]
 pub struct Topic {
     pub id: usize,
-    pub name: String,
+    pub name: Arc<str>,
     pub created_at: IggyTimestamp,
     pub replication_factor: u8,
     pub message_expiry: IggyExpiry,
@@ -185,13 +57,30 @@ pub struct Topic {
     pub max_topic_size: MaxTopicSize,
 
     pub stats: Arc<TopicStats>,
-    pub partitions: Partitions,
-    pub consumer_groups: ConsumerGroups,
+    pub partitions: Vec<Partition>,
+    pub round_robin_counter: Arc<AtomicUsize>,
+}
+
+impl Default for Topic {
+    fn default() -> Self {
+        Self {
+            id: 0,
+            name: Arc::from(""),
+            created_at: IggyTimestamp::default(),
+            replication_factor: 1,
+            message_expiry: IggyExpiry::default(),
+            compression_algorithm: CompressionAlgorithm::default(),
+            max_topic_size: MaxTopicSize::default(),
+            stats: Arc::new(TopicStats::default()),
+            partitions: Vec::new(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
+        }
+    }
 }
 
 impl Topic {
     pub fn new(
-        name: String,
+        name: Arc<str>,
         created_at: IggyTimestamp,
         replication_factor: u8,
         message_expiry: IggyExpiry,
@@ -208,293 +97,336 @@ impl Topic {
             compression_algorithm,
             max_topic_size,
             stats: Arc::new(TopicStats::new(stream_stats)),
-            partitions: Partitions::new(),
-            consumer_groups: ConsumerGroups::new(),
+            partitions: Vec::new(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
         }
     }
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Topics {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<Topic>>,
+#[derive(Debug)]
+pub struct Stream {
+    pub id: usize,
+    pub name: Arc<str>,
+    pub created_at: IggyTimestamp,
+
+    pub stats: Arc<StreamStats>,
+    pub topics: Slab<Topic>,
+    pub topic_index: AHashMap<Arc<str>, usize>,
 }
 
-impl Topics {
-    pub fn new() -> Self {
+impl Default for Stream {
+    fn default() -> Self {
         Self {
-            index: RefCell::new(AHashMap::with_capacity(1024)),
-            items: RefCell::new(Slab::with_capacity(1024)),
-        }
-    }
-
-    pub fn insert(&self, topic: Topic) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = topic.name.clone();
-        let id = items.insert(topic);
-        items[id].id = id;
-        index.insert(name, id);
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Topic> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn get_by_name(&self, name: &str) -> Option<Topic> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
-
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> {
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.get(id as usize)
-                } else {
-                    None
-                }
-            }
-            iggy_common::IdKind::String => {
-                if let Ok(name) = identifier.get_string_value() {
-                    self.get_by_name(&name)
-                } else {
-                    None
-                }
-            }
+            id: 0,
+            name: Arc::from(""),
+            created_at: IggyTimestamp::default(),
+            stats: Arc::new(StreamStats::default()),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
         }
     }
+}
 
-    pub fn remove(&self, id: usize) -> Option<Topic> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        if !items.contains(id) {
-            return None;
+impl Clone for Stream {
+    fn clone(&self) -> Self {
+        Self {
+            id: self.id,
+            name: self.name.clone(),
+            created_at: self.created_at,
+            stats: self.stats.clone(),
+            topics: self.topics.clone(),
+            topic_index: self.topic_index.clone(),
         }
-
-        let topic = items.remove(id);
-        index.remove(&topic.name);
-        Some(topic)
     }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct Stream {
-    pub id: usize,
-    pub name: String,
-    pub created_at: IggyTimestamp,
-
-    pub stats: Arc<StreamStats>,
-    pub topics: Topics,
 }
 
 impl Stream {
-    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+    pub fn new(name: Arc<str>, created_at: IggyTimestamp) -> Self {
         Self {
             id: 0,
             name,
             created_at,
             stats: Arc::new(StreamStats::default()),
-            topics: Topics::new(),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
         }
     }
-}
 
-#[derive(Debug, Clone, Default)]
-pub struct Streams {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<Stream>>,
-}
-
-impl Streams {
-    pub fn new() -> Self {
+    pub fn with_stats(name: Arc<str>, created_at: IggyTimestamp, stats: 
Arc<StreamStats>) -> Self {
         Self {
-            index: RefCell::new(AHashMap::with_capacity(256)),
-            items: RefCell::new(Slab::with_capacity(256)),
+            id: 0,
+            name,
+            created_at,
+            stats,
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
         }
     }
+}
 
-    pub fn insert(&self, stream: Stream) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let name = stream.name.clone();
-        let id = items.insert(stream);
-        items[id].id = id;
-        index.insert(name, id);
-        id
-    }
-
-    pub fn get(&self, id: usize) -> Option<Stream> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    pub fn get_by_name(&self, name: &str) -> Option<Stream> {
-        let index = self.index.borrow();
-        if let Some(&id) = index.get(name) {
-            self.items.borrow().get(id).cloned()
-        } else {
-            None
-        }
-    }
+define_state! {
+    Streams {
+        index: AHashMap<Arc<str>, usize>,
+        items: Slab<Stream>,
+    },
+    [
+        CreateStream,
+        UpdateStream,
+        DeleteStream,
+        PurgeStream,
+        CreateTopic,
+        UpdateTopic,
+        DeleteTopic,
+        PurgeTopic,
+        CreatePartitions,
+        DeletePartitions
+    ]
+}
+impl_absorb!(StreamsInner, StreamsCommand);
 
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream> 
{
+impl StreamsInner {
+    fn resolve_stream_id(&self, identifier: &iggy_common::Identifier) -> 
Option<usize> {
+        use iggy_common::IdKind;
         match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.get(id as usize)
+            IdKind::Numeric => {
+                let id = identifier.get_u32_value().ok()? as usize;
+                if self.items.contains(id) {
+                    Some(id)
                 } else {
                     None
                 }
             }
-            iggy_common::IdKind::String => {
-                if let Ok(name) = identifier.get_string_value() {
-                    self.get_by_name(&name)
-                } else {
-                    None
-                }
+            IdKind::String => {
+                let name = identifier.get_string_value().ok()?;
+                self.index.get(name.as_str()).copied()
             }
         }
     }
 
-    pub fn remove(&self, id: usize) -> Option<Stream> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
+    fn resolve_topic_id(
+        &self,
+        stream_id: usize,
+        identifier: &iggy_common::Identifier,
+    ) -> Option<usize> {
+        use iggy_common::IdKind;
+        let stream = self.items.get(stream_id)?;
 
-        if !items.contains(id) {
-            return None;
-        }
-
-        let stream = items.remove(id);
-        index.remove(&stream.name);
-        Some(stream)
-    }
-
-    pub fn update_name(&self, identifier: &Identifier, new_name: String) -> 
Result<(), IggyError> {
-        let stream = self.get_by_identifier(identifier);
-        if let Some(stream) = stream {
-            let mut items = self.items.borrow_mut();
-            let mut index = self.index.borrow_mut();
-
-            index.remove(&stream.name);
-            if let Some(s) = items.get_mut(stream.id) {
-                s.name = new_name.clone();
+        match identifier.kind {
+            IdKind::Numeric => {
+                let id = identifier.get_u32_value().ok()? as usize;
+                if stream.topics.contains(id) {
+                    Some(id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_string_value().ok()?;
+                stream.topic_index.get(name.as_str()).copied()
             }
-            index.insert(new_name, stream.id);
-            Ok(())
-        } else {
-            Err(IggyError::ResourceNotFound("Stream".to_string()))
-        }
-    }
-
-    pub fn purge(&self, id: usize) -> Result<(), IggyError> {
-        let items = self.items.borrow();
-        if let Some(_stream) = items.get(id) {
-            // TODO: Purge all topics in the stream
-            Ok(())
-        } else {
-            Err(IggyError::ResourceNotFound("Stream".to_string()))
         }
     }
-
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    pub fn iter(&self) -> Vec<Stream> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, s): (usize, &Stream)| s.clone())
-            .collect()
-    }
 }
 
-// Define StreamsCommand enum and StateCommand implementation using the macro
-define_state_command! {
-    Streams,
-    StreamsCommand,
-    [CreateStream, UpdateStream, DeleteStream, PurgeStream]
-}
-
-impl ApplyState for Streams {
-    type Output = ();
-
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
+impl Handler for StreamsInner {
+    fn handle(&mut self, cmd: &StreamsCommand) {
         match cmd {
             StreamsCommand::CreateStream(payload) => {
-                todo!("Handle Create stream with {:?}", payload)
+                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                if self.index.contains_key(&name_arc) {
+                    return;
+                }
+
+                let stream = Stream {
+                    id: 0,
+                    name: name_arc.clone(),
+                    created_at: iggy_common::IggyTimestamp::now(),
+                    stats: Arc::new(StreamStats::default()),
+                    topics: Slab::new(),
+                    topic_index: AHashMap::default(),
+                };
+
+                let id = self.items.insert(stream);
+                if let Some(stream) = self.items.get_mut(id) {
+                    stream.id = id;
+                }
+                self.index.insert(name_arc, id);
             }
             StreamsCommand::UpdateStream(payload) => {
-                todo!("Handle Update stream with {:?}", payload)
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+
+                let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                if let Some(&existing_id) = self.index.get(&new_name_arc)
+                    && existing_id != stream_id
+                {
+                    return;
+                }
+
+                self.index.remove(&stream.name);
+                stream.name = new_name_arc.clone();
+                self.index.insert(new_name_arc, stream_id);
             }
             StreamsCommand::DeleteStream(payload) => {
-                todo!("Handle Delete stream with {:?}", payload)
-            }
-            StreamsCommand::PurgeStream(payload) => todo!("Handle Purge stream 
with {:?}", payload),
-        }
-    }
-}
-
-// Define TopicsCommand enum and StateCommand implementation using the macro
-define_state_command! {
-    Topics,
-    TopicsCommand,
-    [CreateTopic, UpdateTopic, DeleteTopic, PurgeTopic]
-}
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
 
-impl ApplyState for Topics {
-    type Output = ();
+                if let Some(stream) = self.items.get(stream_id) {
+                    let name = stream.name.clone();
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            TopicsCommand::CreateTopic(payload) => todo!("Handle Create topic 
with {:?}", payload),
-            TopicsCommand::UpdateTopic(payload) => todo!("Handle Update topic 
with {:?}", payload),
-            TopicsCommand::DeleteTopic(payload) => todo!("Handle Delete topic 
with {:?}", payload),
-            TopicsCommand::PurgeTopic(payload) => todo!("Handle Purge topic 
with {:?}", payload),
-        }
-    }
-}
+                    self.items.remove(stream_id);
+                    self.index.remove(&name);
+                }
+            }
+            StreamsCommand::PurgeStream(_payload) => {
+                // TODO
+                todo!();
+            }
+            StreamsCommand::CreateTopic(payload) => {
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+
+                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                if stream.topic_index.contains_key(&name_arc) {
+                    return;
+                }
 
-// Define PartitionsCommand enum and StateCommand implementation using the 
macro
-define_state_command! {
-    Partitions,
-    PartitionsCommand,
-    [CreatePartitions, DeletePartitions, DeleteSegments]
-}
+                let topic = Topic {
+                    id: 0, // Will be assigned by slab
+                    name: name_arc.clone(),
+                    created_at: iggy_common::IggyTimestamp::now(),
+                    replication_factor: 
payload.replication_factor.unwrap_or(1),
+                    message_expiry: payload.message_expiry,
+                    compression_algorithm: payload.compression_algorithm,
+                    max_topic_size: payload.max_topic_size,
+                    stats: Arc::new(TopicStats::new(stream.stats.clone())),
+                    partitions: Vec::new(),
+                    round_robin_counter: Arc::new(AtomicUsize::new(0)),
+                };
+
+                let topic_id = stream.topics.insert(topic);
+                if let Some(topic) = stream.topics.get_mut(topic_id) {
+                    topic.id = topic_id;
+
+                    for partition_id in 0..payload.partitions_count as usize {
+                        let partition = Partition {
+                            id: partition_id,
+                            created_at: iggy_common::IggyTimestamp::now(),
+                        };
+                        topic.partitions.push(partition);
+                    }
+                }
 
-impl ApplyState for Partitions {
-    type Output = ();
+                stream.topic_index.insert(name_arc, topic_id);
+            }
+            StreamsCommand::UpdateTopic(payload) => {
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
+                    return;
+                };
+
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+                let Some(topic) = stream.topics.get_mut(topic_id) else {
+                    return;
+                };
+
+                let new_name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                if let Some(&existing_id) = 
stream.topic_index.get(&new_name_arc)
+                    && existing_id != topic_id
+                {
+                    return;
+                }
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            PartitionsCommand::CreatePartitions(payload) => {
-                todo!("Handle Create partitions with {:?}", payload)
+                stream.topic_index.remove(&topic.name);
+                topic.name = new_name_arc.clone();
+                topic.compression_algorithm = payload.compression_algorithm;
+                topic.message_expiry = payload.message_expiry;
+                topic.max_topic_size = payload.max_topic_size;
+                if let Some(rf) = payload.replication_factor {
+                    topic.replication_factor = rf;
+                }
+                stream.topic_index.insert(new_name_arc, topic_id);
             }
-            PartitionsCommand::DeletePartitions(payload) => {
-                todo!("Handle Delete partitions with {:?}", payload)
+            StreamsCommand::DeleteTopic(payload) => {
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
+                    return;
+                };
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+
+                if let Some(topic) = stream.topics.get(topic_id) {
+                    let name = topic.name.clone();
+                    stream.topics.remove(topic_id);
+                    stream.topic_index.remove(&name);
+                }
+            }
+            StreamsCommand::PurgeTopic(_payload) => {
+                // TODO:
+                todo!();
             }
-            PartitionsCommand::DeleteSegments(payload) => {
-                todo!("Handle Delete segments with {:?}", payload)
+            StreamsCommand::CreatePartitions(payload) => {
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
+                    return;
+                };
+
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+                let Some(topic) = stream.topics.get_mut(topic_id) else {
+                    return;
+                };
+
+                let current_partition_count = topic.partitions.len();
+                for i in 0..payload.partitions_count as usize {
+                    let partition_id = current_partition_count + i;
+                    let partition = Partition {
+                        id: partition_id,
+                        created_at: iggy_common::IggyTimestamp::now(),
+                    };
+                    topic.partitions.push(partition);
+                }
+            }
+            StreamsCommand::DeletePartitions(payload) => {
+                let Some(stream_id) = 
self.resolve_stream_id(&payload.stream_id) else {
+                    return;
+                };
+                let Some(topic_id) = self.resolve_topic_id(stream_id, 
&payload.topic_id) else {
+                    return;
+                };
+
+                let Some(stream) = self.items.get_mut(stream_id) else {
+                    return;
+                };
+                let Some(topic) = stream.topics.get_mut(topic_id) else {
+                    return;
+                };
+
+                let count_to_delete = payload.partitions_count as usize;
+                if count_to_delete > 0 && count_to_delete <= 
topic.partitions.len() {
+                    topic
+                        .partitions
+                        .truncate(topic.partitions.len() - count_to_delete);
+                }
             }
         }
     }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index 635277965..81ef0b018 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,295 +15,222 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{
-    permissioner::Permissioner,
-    stm::{ApplyState, StateCommand},
-};
+use crate::permissioner::Permissioner;
+use crate::stm::Handler;
+use crate::{define_state, impl_absorb};
 use ahash::AHashMap;
-use bytes::Bytes;
 use iggy_common::change_password::ChangePassword;
+use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
 use iggy_common::create_user::CreateUser;
+use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
 use iggy_common::delete_user::DeleteUser;
 use iggy_common::update_permissions::UpdatePermissions;
 use iggy_common::update_user::UpdateUser;
-use iggy_common::{
-    BytesSerializable, Identifier, IggyError, IggyTimestamp, Permissions, 
PersonalAccessToken,
-    UserId, UserStatus,
-    header::{Operation, PrepareHeader},
-    message::Message,
-};
+use iggy_common::{IggyTimestamp, Permissions, PersonalAccessToken, UserId, 
UserStatus};
 use slab::Slab;
-use std::cell::RefCell;
+use std::sync::Arc;
+
+// ============================================================================
+// User Entity
+// ============================================================================
 
 #[derive(Debug, Clone)]
 pub struct User {
     pub id: UserId,
-    pub username: String,
-    pub password: String,
+    pub username: Arc<str>,
+    pub password_hash: Arc<str>,
     pub status: UserStatus,
     pub created_at: IggyTimestamp,
-    pub permissions: Option<Permissions>,
-    pub personal_access_tokens: AHashMap<String, PersonalAccessToken>,
+    pub permissions: Option<Arc<Permissions>>,
+}
+
+impl Default for User {
+    fn default() -> Self {
+        Self {
+            id: 0,
+            username: Arc::from(""),
+            password_hash: Arc::from(""),
+            status: UserStatus::default(),
+            created_at: IggyTimestamp::default(),
+            permissions: None,
+        }
+    }
 }
 
 impl User {
     pub fn new(
-        username: String,
-        password: String,
+        username: Arc<str>,
+        password_hash: Arc<str>,
         status: UserStatus,
         created_at: IggyTimestamp,
-        permissions: Option<Permissions>,
+        permissions: Option<Arc<Permissions>>,
     ) -> Self {
         Self {
             id: 0,
             username,
-            password,
+            password_hash,
             status,
             created_at,
             permissions,
-            personal_access_tokens: AHashMap::new(),
         }
     }
 }
 
-#[derive(Debug, Clone, Default)]
-pub struct Users {
-    index: RefCell<AHashMap<String, usize>>,
-    items: RefCell<Slab<User>>,
-    permissioner: RefCell<Permissioner>,
+define_state! {
+    Users {
+        index: AHashMap<Arc<str>, UserId>,
+        items: Slab<User>,
+        personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, 
PersonalAccessToken>>,
+        permissioner: Permissioner,
+    },
+    [
+        CreateUser,
+        UpdateUser,
+        DeleteUser,
+        ChangePassword,
+        UpdatePermissions,
+        CreatePersonalAccessToken,
+        DeletePersonalAccessToken
+    ]
 }
+impl_absorb!(UsersInner, UsersCommand);
 
-impl Users {
-    pub fn new() -> Self {
-        Self {
-            index: RefCell::new(AHashMap::with_capacity(1024)),
-            items: RefCell::new(Slab::with_capacity(1024)),
-            permissioner: RefCell::new(Permissioner::new()),
-        }
-    }
-
-    /// Insert a user and return the assigned ID
-    pub fn insert(&self, user: User) -> usize {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
-
-        let username = user.username.clone();
-        let id = items.insert(user);
-        items[id].id = id as u32;
-        index.insert(username, id);
-        id
-    }
-
-    /// Get user by ID
-    pub fn get(&self, id: usize) -> Option<User> {
-        self.items.borrow().get(id).cloned()
-    }
-
-    /// Get user by username or ID (via Identifier enum)
-    pub fn get_by_identifier(&self, identifier: &Identifier) -> 
Result<Option<User>, IggyError> {
+impl UsersInner {
+    fn resolve_user_id(&self, identifier: &iggy_common::Identifier) -> 
Option<usize> {
+        use iggy_common::IdKind;
         match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                let id = identifier.get_u32_value()? as usize;
-                Ok(self.items.borrow().get(id).cloned())
-            }
-            iggy_common::IdKind::String => {
-                let username = identifier.get_string_value()?;
-                let index = self.index.borrow();
-                if let Some(&id) = index.get(&username) {
-                    Ok(self.items.borrow().get(id).cloned())
+            IdKind::Numeric => {
+                let id = identifier.get_u32_value().ok()? as usize;
+                if self.items.contains(id) {
+                    Some(id)
                 } else {
-                    Ok(None)
+                    None
                 }
             }
+            IdKind::String => {
+                let username = identifier.get_string_value().ok()?;
+                self.index.get(username.as_str()).map(|&id| id as usize)
+            }
         }
     }
+}
 
-    /// Remove user by ID
-    pub fn remove(&self, id: usize) -> Option<User> {
-        let mut items = self.items.borrow_mut();
-        let mut index = self.index.borrow_mut();
+impl Handler for UsersInner {
+    fn handle(&mut self, cmd: &UsersCommand) {
+        match cmd {
+            UsersCommand::CreateUser(payload) => {
+                let username_arc: Arc<str> = 
Arc::from(payload.username.as_str());
+                if self.index.contains_key(&username_arc) {
+                    return;
+                }
 
-        if !items.contains(id) {
-            return None;
-        }
+                let user = User {
+                    id: 0,
+                    username: username_arc.clone(),
+                    password_hash: Arc::from(payload.password.as_str()),
+                    status: payload.status,
+                    created_at: iggy_common::IggyTimestamp::now(),
+                    permissions: payload.permissions.as_ref().map(|p| 
Arc::new(p.clone())),
+                };
+
+                let id = self.items.insert(user);
+                if let Some(user) = self.items.get_mut(id) {
+                    user.id = id as UserId;
+                }
 
-        let user = items.remove(id);
-        index.remove(&user.username);
-        Some(user)
-    }
+                self.index.insert(username_arc, id as UserId);
+                self.personal_access_tokens
+                    .insert(id as UserId, AHashMap::default());
+            }
+            UsersCommand::UpdateUser(payload) => {
+                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
+                    return;
+                };
+
+                let Some(user) = self.items.get_mut(user_id) else {
+                    return;
+                };
+
+                if let Some(new_username) = &payload.username {
+                    let new_username_arc: Arc<str> = 
Arc::from(new_username.as_str());
+                    if let Some(&existing_id) = 
self.index.get(&new_username_arc)
+                        && existing_id != user_id as UserId
+                    {
+                        return;
+                    }
+
+                    self.index.remove(&user.username);
+                    user.username = new_username_arc.clone();
+                    self.index.insert(new_username_arc, user_id as UserId);
+                }
 
-    /// Check if user exists
-    pub fn contains(&self, identifier: &Identifier) -> bool {
-        match identifier.kind {
-            iggy_common::IdKind::Numeric => {
-                if let Ok(id) = identifier.get_u32_value() {
-                    self.items.borrow().contains(id as usize)
-                } else {
-                    false
+                if let Some(new_status) = payload.status {
+                    user.status = new_status;
                 }
             }
-            iggy_common::IdKind::String => {
-                if let Ok(username) = identifier.get_string_value() {
-                    self.index.borrow().contains_key(&username)
-                } else {
-                    false
+            UsersCommand::DeleteUser(payload) => {
+                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
+                    return;
+                };
+
+                if let Some(user) = self.items.get(user_id) {
+                    let username = user.username.clone();
+                    self.items.remove(user_id);
+                    self.index.remove(&username);
+                    self.personal_access_tokens.remove(&(user_id as UserId));
                 }
             }
-        }
-    }
-
-    /// Get all users as a Vec
-    pub fn values(&self) -> Vec<User> {
-        self.items
-            .borrow()
-            .iter()
-            .map(|(_, u): (usize, &User)| u.clone())
-            .collect()
-    }
-
-    /// Get number of users
-    pub fn len(&self) -> usize {
-        self.items.borrow().len()
-    }
-
-    /// Check if empty
-    pub fn is_empty(&self) -> bool {
-        self.items.borrow().is_empty()
-    }
-
-    /// Check if username already exists
-    pub fn username_exists(&self, username: &str) -> bool {
-        self.index.borrow().contains_key(username)
-    }
-
-    /// Get ID by username
-    pub fn get_id_by_username(&self, username: &str) -> Option<usize> {
-        self.index.borrow().get(username).copied()
-    }
-
-    /// Initialize permissions for a user
-    pub fn init_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
-        self.permissioner
-            .borrow_mut()
-            .init_permissions_for_user(user_id, permissions);
-    }
-
-    /// Update permissions for a user
-    pub fn update_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
-        self.permissioner
-            .borrow_mut()
-            .update_permissions_for_user(user_id, permissions);
-    }
-
-    /// Delete permissions for a user
-    pub fn delete_permissions(&self, user_id: UserId) {
-        self.permissioner
-            .borrow_mut()
-            .delete_permissions_for_user(user_id);
-    }
+            UsersCommand::ChangePassword(payload) => {
+                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
+                    return;
+                };
 
-    /// Update username
-    pub fn update_username(
-        &self,
-        identifier: &Identifier,
-        new_username: String,
-    ) -> Result<(), IggyError> {
-        let id = match identifier.kind {
-            iggy_common::IdKind::Numeric => identifier.get_u32_value()? as 
usize,
-            iggy_common::IdKind::String => {
-                let username = identifier.get_string_value()?;
-                let index = self.index.borrow();
-                *index
-                    .get(&username)
-                    .ok_or_else(|| 
IggyError::ResourceNotFound(username.to_string()))?
+                if let Some(user) = self.items.get_mut(user_id) {
+                    user.password_hash = 
Arc::from(payload.new_password.as_str());
+                }
             }
-        };
-
-        let old_username = {
-            let items = self.items.borrow();
-            let user = items
-                .get(id)
-                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
-            user.username.clone()
-        };
-
-        if old_username == new_username {
-            return Ok(());
-        }
-
-        tracing::trace!(
-            "Updating username: '{}' → '{}' for user ID: {}",
-            old_username,
-            new_username,
-            id
-        );
-
-        {
-            let mut items = self.items.borrow_mut();
-            let user = items
-                .get_mut(id)
-                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
-            user.username = new_username.clone();
-        }
-
-        let mut index = self.index.borrow_mut();
-        index.remove(&old_username);
-        index.insert(new_username, id);
-
-        Ok(())
-    }
-}
-
-#[derive(Debug)]
-pub enum UsersCommand {
-    Create(CreateUser),
-    Update(UpdateUser),
-    Delete(DeleteUser),
-    ChangePassword(ChangePassword),
-    UpdatePermissions(UpdatePermissions),
-}
-
-impl StateCommand for Users {
-    type Command = UsersCommand;
-    type Input = Message<PrepareHeader>;
+            UsersCommand::UpdatePermissions(payload) => {
+                let Some(user_id) = self.resolve_user_id(&payload.user_id) 
else {
+                    return;
+                };
 
-    fn into_command(input: &Self::Input) -> Option<Self::Command> {
-        // TODO: rework this thing, so we don't copy the bytes on each request
-        let body = Bytes::copy_from_slice(input.body());
-        match input.header().operation {
-            Operation::CreateUser => Some(UsersCommand::Create(
-                CreateUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::UpdateUser => Some(UsersCommand::Update(
-                UpdateUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::DeleteUser => Some(UsersCommand::Delete(
-                DeleteUser::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::ChangePassword => Some(UsersCommand::ChangePassword(
-                ChangePassword::from_bytes(body.clone()).unwrap(),
-            )),
-            Operation::UpdatePermissions => 
Some(UsersCommand::UpdatePermissions(
-                UpdatePermissions::from_bytes(body.clone()).unwrap(),
-            )),
-            _ => None,
-        }
-    }
-}
+                if let Some(user) = self.items.get_mut(user_id) {
+                    user.permissions = payload.permissions.as_ref().map(|p| 
Arc::new(p.clone()));
+                }
+            }
+            UsersCommand::CreatePersonalAccessToken(payload) => {
+                // TODO: Stub untill protocol gets adjusted.
+                let user_id = 0;
+                let user_tokens = 
self.personal_access_tokens.entry(user_id).or_default();
+                let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                if user_tokens.contains_key(&name_arc) {
+                    return;
+                }
 
-impl ApplyState for Users {
-    type Output = ();
+                let expiry_at =
+                    
PersonalAccessToken::calculate_expiry_at(IggyTimestamp::now(), payload.expiry);
+                if let Some(expiry_at) = expiry_at
+                    && expiry_at.as_micros() <= 
IggyTimestamp::now().as_micros()
+                {
+                    return;
+                }
 
-    fn do_apply(&self, cmd: Self::Command) -> Self::Output {
-        match cmd {
-            UsersCommand::Create(payload) => todo!("Handle Create user with 
{:?}", payload),
-            UsersCommand::Update(payload) => todo!("Handle Update user with 
{:?}", payload),
-            UsersCommand::Delete(payload) => todo!("Handle Delete user with 
{:?}", payload),
-            UsersCommand::ChangePassword(payload) => {
-                todo!("Handle Change password with {:?}", payload)
+                let (pat, _) = PersonalAccessToken::new(
+                    user_id,
+                    payload.name.as_ref(),
+                    IggyTimestamp::now(),
+                    payload.expiry,
+                );
+                user_tokens.insert(name_arc, pat);
             }
-            UsersCommand::UpdatePermissions(payload) => {
-                todo!("Handle Update permissions with {:?}", payload)
+            UsersCommand::DeletePersonalAccessToken(payload) => {
+                // TODO: Stub untill protocol gets adjusted.
+                let user_id = 0;
+
+                if let Some(user_tokens) = 
self.personal_access_tokens.get_mut(&user_id) {
+                    let name_arc: Arc<str> = Arc::from(payload.name.as_str());
+                    user_tokens.remove(&name_arc);
+                }
             }
         }
     }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index 743f14f18..3ba2cecb5 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -71,7 +71,7 @@ hash32 = "1.0.0"
 human-repr = { workspace = true }
 iggy_common = { workspace = true }
 jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
-left-right = "0.11"
+left-right = { workspace = true }
 lending-iterator = "0.1.7"
 metadata = { workspace = true }
 mimalloc = { workspace = true, optional = true }

Reply via email to