This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch metadata_dev
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 2bb03d17db6e5107996cd28740a760962c98d3aa
Author: numinex <[email protected]>
AuthorDate: Mon Jan 5 15:33:48 2026 +0100

    squash
---
 Cargo.lock                                         |   4 +
 core/common/Cargo.toml                             |   2 +
 core/common/src/lib.rs                             |   2 +
 core/common/src/types/consensus/message.rs         |  24 ++
 core/common/src/types/mod.rs                       |   1 +
 .../src/types/personal_access_tokens/mod.rs}       |  14 +-
 .../src/streaming => common/src}/utils/hash.rs     |   0
 core/common/src/utils/mod.rs                       |   1 +
 core/metadata/Cargo.toml                           |   2 +
 core/metadata/src/lib.rs                           |   3 +
 core/metadata/src/permissioner/mod.rs              |   2 +
 core/metadata/src/permissioner/permissioner.rs     |  96 +++++
 .../permissioner_rules/consumer_groups.rs          |  76 ++++
 .../permissioner_rules/consumer_offsets.rs}        |  41 +-
 .../permissioner/permissioner_rules/messages.rs    | 135 +++++++
 .../src/permissioner/permissioner_rules}/mod.rs    |  10 +-
 .../permissioner/permissioner_rules/partitions.rs} |  34 +-
 .../permissioner/permissioner_rules/segments.rs}   |  14 +-
 .../src/permissioner/permissioner_rules/streams.rs |  86 +++++
 .../src/permissioner/permissioner_rules/system.rs} |  36 +-
 .../src/permissioner/permissioner_rules/topics.rs  | 153 ++++++++
 .../src/permissioner/permissioner_rules/users.rs   |  70 ++++
 core/metadata/src/stats/mod.rs                     | 349 +++++++++++++++++
 core/metadata/src/stm/consumer_group.rs            | 185 ++++++++-
 core/metadata/src/stm/mod.rs                       |  37 +-
 core/metadata/src/stm/mux.rs                       |  25 +-
 core/metadata/src/stm/stream.rs                    | 429 ++++++++++++++++++++-
 core/metadata/src/stm/user.rs                      | 245 +++++++++++-
 core/server/src/binary/mapper.rs                   |   5 +-
 core/server/src/bootstrap.rs                       |   3 +-
 core/server/src/http/http_shard_wrapper.rs         |   2 +-
 core/server/src/http/mapper.rs                     |   2 +-
 core/server/src/http/personal_access_tokens.rs     |   2 +-
 .../src/shard/system/personal_access_tokens.rs     |   2 +-
 core/server/src/shard/transmission/event.rs        |   5 +-
 core/server/src/state/system.rs                    |   2 +-
 .../server/src/streaming/clients/client_manager.rs |   6 +-
 core/server/src/streaming/mod.rs                   |   1 -
 core/server/src/streaming/polling_consumer.rs      |   5 +-
 core/server/src/streaming/topics/helpers.rs        |   5 +-
 core/server/src/streaming/users/user.rs            |   2 +-
 core/server/src/streaming/utils/mod.rs             |   1 -
 42 files changed, 2012 insertions(+), 107 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b37ab1d22..5704427ad 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4656,6 +4656,7 @@ dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
  "base64 0.22.1",
+ "blake3",
  "bon",
  "byte-unit",
  "bytemuck",
@@ -4677,6 +4678,7 @@ dependencies = [
  "nix",
  "once_cell",
  "rcgen",
+ "ring",
  "rustls",
  "serde",
  "serde_json",
@@ -5656,10 +5658,12 @@ dependencies = [
 name = "metadata"
 version = "0.1.0"
 dependencies = [
+ "ahash 0.8.12",
  "consensus",
  "iggy_common",
  "journal",
  "message_bus",
+ "slab",
  "tracing",
 ]
 
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index e935435ef..a529c1ab3 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -36,6 +36,7 @@ fast_async_lock = ["dep:fast-async-mutex"]
 aes-gcm = { workspace = true }
 ahash = { workspace = true }
 base64 = { workspace = true }
+blake3 = { workspace = true }
 bon = { workspace = true }
 byte-unit = { workspace = true }
 bytemuck = { workspace = true }
@@ -57,6 +58,7 @@ humantime = { workspace = true }
 nix = { workspace = true }
 once_cell = { workspace = true }
 rcgen = "0.14.6"
+ring = "0.17.14"
 rustls = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index af04602ce..9171c4097 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -88,6 +88,7 @@ pub use types::message::*;
 pub use types::partition::*;
 pub use types::permissions::permissions_global::*;
 pub use types::permissions::personal_access_token::*;
+pub use types::personal_access_tokens::*;
 pub use types::snapshot::*;
 pub use types::stats::*;
 pub use types::stream::*;
@@ -100,6 +101,7 @@ pub use utils::checksum::*;
 pub use utils::crypto::*;
 pub use utils::duration::{IggyDuration, SEC_IN_MICRO};
 pub use utils::expiry::IggyExpiry;
+pub use utils::hash::*;
 pub use utils::personal_access_token_expiry::PersonalAccessTokenExpiry;
 pub use utils::text;
 pub use utils::timestamp::*;
diff --git a/core/common/src/types/consensus/message.rs 
b/core/common/src/types/consensus/message.rs
index 977b06e32..0f9f78f3a 100644
--- a/core/common/src/types/consensus/message.rs
+++ b/core/common/src/types/consensus/message.rs
@@ -89,6 +89,30 @@ where
         }
     }
 
+    pub fn transmute_header<T: ConsensusHeader>(self, f: impl FnOnce(H, &mut 
T)) -> Message<T> {
+        assert_eq!(size_of::<H>(), size_of::<T>());
+
+        // Copy old header to stack to avoid UB.
+        let old_header  = *self.header();
+
+        // Safety: We ensured that size_of::<H>() == size_of::<T>()
+        // On top of that, there is going to be only one reference to buffer 
during this function call
+        // so no other references can observe the mutation.
+        // In the future we can replace the `Bytes` buffer with something that 
does not allow sharing between different threads.
+        let buffer = self.into_inner();
+        unsafe {
+            let ptr = buffer.as_ptr() as *mut u8;
+            let slice = std::slice::from_raw_parts_mut(ptr, size_of::<T>());
+            let new_header: &mut T = bytemuck::from_bytes_mut(slice);
+            f(old_header, new_header);
+        }
+
+        Message {
+            buffer,
+            _marker: PhantomData,
+        }
+    }
+
     /// Replace the header with a new one, using the provided function to 
generate it.
     pub fn replace_header<T: ConsensusHeader>(self, f: impl FnOnce(&H) -> T) 
-> Message<T> {
         assert_eq!(size_of::<H>(), size_of::<T>());
diff --git a/core/common/src/types/mod.rs b/core/common/src/types/mod.rs
index f0d70cf9c..065a48c1e 100644
--- a/core/common/src/types/mod.rs
+++ b/core/common/src/types/mod.rs
@@ -29,6 +29,7 @@ pub(crate) mod identifier;
 pub(crate) mod message;
 pub(crate) mod partition;
 pub(crate) mod permissions;
+pub(crate) mod personal_access_tokens;
 pub(crate) mod snapshot;
 pub(crate) mod stats;
 pub(crate) mod stream;
diff --git 
a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs 
b/core/common/src/types/personal_access_tokens/mod.rs
similarity index 94%
rename from 
core/server/src/streaming/personal_access_tokens/personal_access_token.rs
rename to core/common/src/types/personal_access_tokens/mod.rs
index 8fc0c8535..e28a585de 100644
--- a/core/server/src/streaming/personal_access_tokens/personal_access_token.rs
+++ b/core/common/src/types/personal_access_tokens/mod.rs
@@ -15,11 +15,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use crate::streaming::utils::hash;
-use iggy_common::IggyExpiry;
-use iggy_common::IggyTimestamp;
-use iggy_common::UserId;
-use iggy_common::text::as_base64;
+use crate::IggyExpiry;
+use crate::IggyTimestamp;
+use crate::UserId;
+use crate::text::as_base64;
+use crate::utils::hash;
 use ring::rand::SecureRandom;
 use std::sync::Arc;
 
@@ -96,8 +96,8 @@ impl PersonalAccessToken {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use iggy_common::IggyDuration;
-    use iggy_common::IggyTimestamp;
+    use crate::IggyDuration;
+    use crate::IggyTimestamp;
 
     #[test]
     fn 
personal_access_token_should_be_created_with_random_secure_value_and_hashed_successfully()
 {
diff --git a/core/server/src/streaming/utils/hash.rs 
b/core/common/src/utils/hash.rs
similarity index 100%
copy from core/server/src/streaming/utils/hash.rs
copy to core/common/src/utils/hash.rs
diff --git a/core/common/src/utils/mod.rs b/core/common/src/utils/mod.rs
index 35656a15c..aa8714daa 100644
--- a/core/common/src/utils/mod.rs
+++ b/core/common/src/utils/mod.rs
@@ -20,6 +20,7 @@ pub(crate) mod checksum;
 pub(crate) mod crypto;
 pub(crate) mod duration;
 pub(crate) mod expiry;
+pub(crate) mod hash;
 pub(crate) mod personal_access_token_expiry;
 pub mod text;
 pub(crate) mod timestamp;
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index a6ba9a43f..335808e6b 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -33,3 +33,5 @@ iggy_common = { path = "../common" }
 journal = { path = "../journal" }
 message_bus = { path = "../message_bus" }
 tracing = { workspace = true }
+ahash = { workspace = true }
+slab = "0.4.11"
diff --git a/core/metadata/src/lib.rs b/core/metadata/src/lib.rs
index c86b13993..9b22a6211 100644
--- a/core/metadata/src/lib.rs
+++ b/core/metadata/src/lib.rs
@@ -18,4 +18,7 @@
 //! Iggy metadata module
 
 mod impls;
+mod permissioner;
 pub mod stm;
+
+mod stats;
diff --git a/core/metadata/src/permissioner/mod.rs 
b/core/metadata/src/permissioner/mod.rs
new file mode 100644
index 000000000..da31e18bf
--- /dev/null
+++ b/core/metadata/src/permissioner/mod.rs
@@ -0,0 +1,2 @@
+pub mod permissioner;
+pub mod permissioner_rules;
diff --git a/core/metadata/src/permissioner/permissioner.rs 
b/core/metadata/src/permissioner/permissioner.rs
new file mode 100644
index 000000000..79adca03f
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ahash::{AHashMap, AHashSet};
+use iggy_common::{GlobalPermissions, Permissions, StreamPermissions, UserId};
+
+#[derive(Debug, Default, Clone)]
+pub struct Permissioner {
+    pub users_permissions: AHashMap<UserId, GlobalPermissions>,
+    pub users_streams_permissions: AHashMap<(UserId, usize), 
StreamPermissions>,
+    pub users_that_can_poll_messages_from_all_streams: AHashSet<UserId>,
+    pub users_that_can_send_messages_to_all_streams: AHashSet<UserId>,
+    pub users_that_can_poll_messages_from_specific_streams: AHashSet<(UserId, 
usize)>,
+    pub users_that_can_send_messages_to_specific_streams: AHashSet<(UserId, 
usize)>,
+}
+
+impl Permissioner {
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    pub fn init_permissions(&mut self, user_id: UserId, permissions: 
Option<Permissions>) {
+        if permissions.is_none() {
+            return;
+        }
+
+        let permissions = permissions.unwrap();
+        if permissions.global.poll_messages {
+            self.users_that_can_poll_messages_from_all_streams
+                .insert(user_id);
+        }
+
+        if permissions.global.send_messages {
+            self.users_that_can_send_messages_to_all_streams
+                .insert(user_id);
+        }
+
+        self.users_permissions.insert(user_id, permissions.global);
+        if permissions.streams.is_none() {
+            return;
+        }
+
+        let streams = permissions.streams.unwrap();
+        for (stream_id, stream) in streams {
+            if stream.poll_messages {
+                self.users_that_can_poll_messages_from_specific_streams
+                    .insert((user_id, stream_id));
+            }
+
+            if stream.send_messages {
+                self.users_that_can_send_messages_to_specific_streams
+                    .insert((user_id, stream_id));
+            }
+
+            self.users_streams_permissions
+                .insert((user_id, stream_id), stream);
+        }
+    }
+
+    pub fn update_permissions_for_user(
+        &mut self,
+        user_id: UserId,
+        permissions: Option<Permissions>,
+    ) {
+        self.delete_permissions(user_id);
+        self.init_permissions(user_id, permissions);
+    }
+
+    pub fn delete_permissions(&mut self, user_id: UserId) {
+        self.users_permissions.remove(&user_id);
+        self.users_that_can_poll_messages_from_all_streams
+            .remove(&user_id);
+        self.users_that_can_send_messages_to_all_streams
+            .remove(&user_id);
+        self.users_streams_permissions
+            .retain(|(id, _), _| *id != user_id);
+        self.users_that_can_poll_messages_from_specific_streams
+            .retain(|(id, _)| *id != user_id);
+        self.users_that_can_send_messages_to_specific_streams
+            .retain(|(id, _)| *id != user_id);
+    }
+}
diff --git 
a/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs 
b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
new file mode 100644
index 000000000..85111fdd0
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_groups.rs
@@ -0,0 +1,76 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn create_consumer_group(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn delete_consumer_group(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn get_consumer_group(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn get_consumer_groups(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn join_consumer_group(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn leave_consumer_group(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.get_topic(user_id, stream_id, topic_id)
+    }
+}
diff --git a/core/server/src/streaming/utils/hash.rs 
b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
similarity index 52%
copy from core/server/src/streaming/utils/hash.rs
copy to core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
index f2829a6be..7d9a7340a 100644
--- a/core/server/src/streaming/utils/hash.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/consumer_offsets.rs
@@ -16,23 +16,34 @@
  * under the License.
  */
 
-use twox_hash::XxHash32;
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
 
-pub fn calculate_32(data: &[u8]) -> u32 {
-    XxHash32::oneshot(0, data)
-}
+impl Permissioner {
+    pub fn get_consumer_offset(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.poll_messages(user_id, stream_id, topic_id)
+    }
 
-pub fn calculate_256(data: &[u8]) -> String {
-    blake3::hash(data).to_hex().to_string()
-}
+    pub fn store_consumer_offset(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.poll_messages(user_id, stream_id, topic_id)
+    }
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn given_same_input_calculate_should_produce_same_output() {
-        let input = "hello world".as_bytes();
-        let output1 = super::calculate_32(input);
-        let output2 = super::calculate_32(input);
-        assert_eq!(output1, output2);
+    pub fn delete_consumer_offset(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.poll_messages(user_id, stream_id, topic_id)
     }
 }
diff --git a/core/metadata/src/permissioner/permissioner_rules/messages.rs 
b/core/metadata/src/permissioner/permissioner_rules/messages.rs
new file mode 100644
index 000000000..29982faf6
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/messages.rs
@@ -0,0 +1,135 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn poll_messages(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        if self
+            .users_that_can_poll_messages_from_all_streams
+            .contains(&user_id)
+        {
+            return Ok(());
+        }
+
+        if self
+            .users_that_can_poll_messages_from_specific_streams
+            .contains(&(user_id, stream_id))
+        {
+            return Ok(());
+        }
+
+        let stream_permissions = self.users_streams_permissions.get(&(user_id, 
stream_id));
+        if stream_permissions.is_none() {
+            return Err(IggyError::Unauthorized);
+        }
+
+        let stream_permissions = stream_permissions.unwrap();
+        if stream_permissions.read_stream {
+            return Ok(());
+        }
+
+        if stream_permissions.manage_topics {
+            return Ok(());
+        }
+
+        if stream_permissions.read_topics {
+            return Ok(());
+        }
+
+        if stream_permissions.poll_messages {
+            return Ok(());
+        }
+
+        if stream_permissions.topics.is_none() {
+            return Err(IggyError::Unauthorized);
+        }
+
+        let topic_permissions = stream_permissions.topics.as_ref().unwrap();
+        if let Some(topic_permissions) = topic_permissions.get(&topic_id) {
+            return match topic_permissions.poll_messages
+                | topic_permissions.read_topic
+                | topic_permissions.manage_topic
+            {
+                true => Ok(()),
+                false => Err(IggyError::Unauthorized),
+            };
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn append_messages(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        if self
+            .users_that_can_send_messages_to_all_streams
+            .contains(&user_id)
+        {
+            return Ok(());
+        }
+
+        if self
+            .users_that_can_send_messages_to_specific_streams
+            .contains(&(user_id, stream_id))
+        {
+            return Ok(());
+        }
+
+        let stream_permissions = self.users_streams_permissions.get(&(user_id, 
stream_id));
+        if stream_permissions.is_none() {
+            return Err(IggyError::Unauthorized);
+        }
+
+        let stream_permissions = stream_permissions.unwrap();
+        if stream_permissions.manage_stream {
+            return Ok(());
+        }
+
+        if stream_permissions.manage_topics {
+            return Ok(());
+        }
+
+        if stream_permissions.send_messages {
+            return Ok(());
+        }
+
+        if stream_permissions.topics.is_none() {
+            return Err(IggyError::Unauthorized);
+        }
+
+        let topic_permissions = stream_permissions.topics.as_ref().unwrap();
+        if let Some(topic_permissions) = topic_permissions.get(&topic_id) {
+            return match topic_permissions.send_messages | 
topic_permissions.manage_topic {
+                true => Ok(()),
+                false => Err(IggyError::Unauthorized),
+            };
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+}
diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs 
b/core/metadata/src/permissioner/permissioner_rules/mod.rs
similarity index 85%
copy from core/server/src/streaming/personal_access_tokens/mod.rs
copy to core/metadata/src/permissioner/permissioner_rules/mod.rs
index d404dbe5b..0d08a7a1c 100644
--- a/core/server/src/streaming/personal_access_tokens/mod.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/mod.rs
@@ -16,4 +16,12 @@
  * under the License.
  */
 
-pub mod personal_access_token;
+mod consumer_groups;
+pub mod consumer_offsets;
+mod messages;
+mod partitions;
+mod segments;
+mod streams;
+mod system;
+mod topics;
+mod users;
diff --git a/core/server/src/streaming/utils/hash.rs 
b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
similarity index 61%
copy from core/server/src/streaming/utils/hash.rs
copy to core/metadata/src/permissioner/permissioner_rules/partitions.rs
index f2829a6be..cd480ccd0 100644
--- a/core/server/src/streaming/utils/hash.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/partitions.rs
@@ -16,23 +16,25 @@
  * under the License.
  */
 
-use twox_hash::XxHash32;
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
 
-pub fn calculate_32(data: &[u8]) -> u32 {
-    XxHash32::oneshot(0, data)
-}
-
-pub fn calculate_256(data: &[u8]) -> String {
-    blake3::hash(data).to_hex().to_string()
-}
+impl Permissioner {
+    pub fn create_partitions(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.update_topic(user_id, stream_id, topic_id)
+    }
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn given_same_input_calculate_should_produce_same_output() {
-        let input = "hello world".as_bytes();
-        let output1 = super::calculate_32(input);
-        let output2 = super::calculate_32(input);
-        assert_eq!(output1, output2);
+    pub fn delete_partitions(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.update_topic(user_id, stream_id, topic_id)
     }
 }
diff --git a/core/server/src/streaming/personal_access_tokens/mod.rs 
b/core/metadata/src/permissioner/permissioner_rules/segments.rs
similarity index 71%
rename from core/server/src/streaming/personal_access_tokens/mod.rs
rename to core/metadata/src/permissioner/permissioner_rules/segments.rs
index d404dbe5b..64493a9b8 100644
--- a/core/server/src/streaming/personal_access_tokens/mod.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/segments.rs
@@ -16,4 +16,16 @@
  * under the License.
  */
 
-pub mod personal_access_token;
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn delete_segments(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.update_topic(user_id, stream_id, topic_id)
+    }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/streams.rs 
b/core/metadata/src/permissioner/permissioner_rules/streams.rs
new file mode 100644
index 000000000..50b7fd44a
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/streams.rs
@@ -0,0 +1,86 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn get_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_streams || 
global_permissions.read_streams)
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+            && (stream_permissions.manage_stream || 
stream_permissions.read_stream)
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn get_streams(&self, user_id: u32) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_streams || 
global_permissions.read_streams)
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn create_stream(&self, user_id: u32) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && global_permissions.manage_streams
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn update_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        self.manage_stream(user_id, stream_id)
+    }
+
+    pub fn delete_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        self.manage_stream(user_id, stream_id)
+    }
+
+    pub fn purge_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        self.manage_stream(user_id, stream_id)
+    }
+
+    fn manage_stream(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && global_permissions.manage_streams
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+            && stream_permissions.manage_stream
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+}
diff --git a/core/server/src/streaming/utils/hash.rs 
b/core/metadata/src/permissioner/permissioner_rules/system.rs
similarity index 51%
rename from core/server/src/streaming/utils/hash.rs
rename to core/metadata/src/permissioner/permissioner_rules/system.rs
index f2829a6be..2ce1fdb29 100644
--- a/core/server/src/streaming/utils/hash.rs
+++ b/core/metadata/src/permissioner/permissioner_rules/system.rs
@@ -16,23 +16,29 @@
  * under the License.
  */
 
-use twox_hash::XxHash32;
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
 
-pub fn calculate_32(data: &[u8]) -> u32 {
-    XxHash32::oneshot(0, data)
-}
+impl Permissioner {
+    pub fn get_stats(&self, user_id: u32) -> Result<(), IggyError> {
+        self.get_server_info(user_id)
+    }
 
-pub fn calculate_256(data: &[u8]) -> String {
-    blake3::hash(data).to_hex().to_string()
-}
+    pub fn get_clients(&self, user_id: u32) -> Result<(), IggyError> {
+        self.get_server_info(user_id)
+    }
+
+    pub fn get_client(&self, user_id: u32) -> Result<(), IggyError> {
+        self.get_server_info(user_id)
+    }
+
+    fn get_server_info(&self, user_id: u32) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_servers || 
global_permissions.read_servers)
+        {
+            return Ok(());
+        }
 
-#[cfg(test)]
-mod tests {
-    #[test]
-    fn given_same_input_calculate_should_produce_same_output() {
-        let input = "hello world".as_bytes();
-        let output1 = super::calculate_32(input);
-        let output2 = super::calculate_32(input);
-        assert_eq!(output1, output2);
+        Err(IggyError::Unauthorized)
     }
 }
diff --git a/core/metadata/src/permissioner/permissioner_rules/topics.rs 
b/core/metadata/src/permissioner/permissioner_rules/topics.rs
new file mode 100644
index 000000000..fcf7ae5c1
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/topics.rs
@@ -0,0 +1,153 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn get_topic(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.read_streams
+                || global_permissions.manage_streams
+                || global_permissions.manage_topics
+                || global_permissions.read_topics)
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+        {
+            if stream_permissions.manage_topics || 
stream_permissions.read_topics {
+                return Ok(());
+            }
+
+            if let Some(topic_permissions) =
+                stream_permissions.topics.as_ref().unwrap().get(&topic_id)
+                && (topic_permissions.manage_topic || 
topic_permissions.read_topic)
+            {
+                return Ok(());
+            }
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn get_topics(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.read_streams
+                || global_permissions.manage_streams
+                || global_permissions.manage_topics
+                || global_permissions.read_topics)
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+        {
+            if stream_permissions.manage_topics || 
stream_permissions.read_topics {
+                return Ok(());
+            }
+
+            if let Some(topic_permissions) =
+                stream_permissions.topics.as_ref().unwrap().get(&stream_id)
+                && (topic_permissions.manage_topic || 
topic_permissions.read_topic)
+            {
+                return Ok(());
+            }
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn create_topic(&self, user_id: u32, stream_id: usize) -> Result<(), 
IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_streams || 
global_permissions.manage_topics)
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+            && stream_permissions.manage_topics
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    pub fn update_topic(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.manage_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn delete_topic(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.manage_topic(user_id, stream_id, topic_id)
+    }
+
+    pub fn purge_topic(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        self.manage_topic(user_id, stream_id, topic_id)
+    }
+
+    fn manage_topic(
+        &self,
+        user_id: u32,
+        stream_id: usize,
+        topic_id: usize,
+    ) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_streams || 
global_permissions.manage_topics)
+        {
+            return Ok(());
+        }
+
+        if let Some(stream_permissions) = 
self.users_streams_permissions.get(&(user_id, stream_id))
+        {
+            if stream_permissions.manage_topics {
+                return Ok(());
+            }
+
+            if let Some(topic_permissions) =
+                stream_permissions.topics.as_ref().unwrap().get(&topic_id)
+                && topic_permissions.manage_topic
+            {
+                return Ok(());
+            }
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+}
diff --git a/core/metadata/src/permissioner/permissioner_rules/users.rs 
b/core/metadata/src/permissioner/permissioner_rules/users.rs
new file mode 100644
index 000000000..b5c016a33
--- /dev/null
+++ b/core/metadata/src/permissioner/permissioner_rules/users.rs
@@ -0,0 +1,70 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::permissioner::permissioner::Permissioner;
+use iggy_common::IggyError;
+
+impl Permissioner {
+    pub fn get_user(&self, user_id: u32) -> Result<(), IggyError> {
+        self.read_users(user_id)
+    }
+
+    pub fn get_users(&self, user_id: u32) -> Result<(), IggyError> {
+        self.read_users(user_id)
+    }
+
+    pub fn create_user(&self, user_id: u32) -> Result<(), IggyError> {
+        self.manager_users(user_id)
+    }
+
+    pub fn delete_user(&self, user_id: u32) -> Result<(), IggyError> {
+        self.manager_users(user_id)
+    }
+
+    pub fn update_user(&self, user_id: u32) -> Result<(), IggyError> {
+        self.manager_users(user_id)
+    }
+
+    pub fn update_permissions(&self, user_id: u32) -> Result<(), IggyError> {
+        self.manager_users(user_id)
+    }
+
+    pub fn change_password(&self, user_id: u32) -> Result<(), IggyError> {
+        self.manager_users(user_id)
+    }
+
+    fn manager_users(&self, user_id: u32) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && global_permissions.manage_users
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+
+    fn read_users(&self, user_id: u32) -> Result<(), IggyError> {
+        if let Some(global_permissions) = self.users_permissions.get(&user_id)
+            && (global_permissions.manage_users || 
global_permissions.read_users)
+        {
+            return Ok(());
+        }
+
+        Err(IggyError::Unauthorized)
+    }
+}
diff --git a/core/metadata/src/stats/mod.rs b/core/metadata/src/stats/mod.rs
new file mode 100644
index 000000000..89d594665
--- /dev/null
+++ b/core/metadata/src/stats/mod.rs
@@ -0,0 +1,349 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{
+    Arc,
+    atomic::{AtomicU32, AtomicU64, Ordering},
+};
+
+#[derive(Default, Debug)]
+pub struct StreamStats {
+    size_bytes: AtomicU64,
+    messages_count: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl StreamStats {
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+    }
+}
+
+#[derive(Default, Debug)]
+pub struct TopicStats {
+    parent: Arc<StreamStats>,
+    size_bytes: AtomicU64,
+    messages_count: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl TopicStats {
+    pub fn new(parent: Arc<StreamStats>) -> Self {
+        Self {
+            parent,
+            size_bytes: AtomicU64::new(0),
+            messages_count: AtomicU64::new(0),
+            segments_count: AtomicU32::new(0),
+        }
+    }
+
+    pub fn parent(&self) -> Arc<StreamStats> {
+        self.parent.clone()
+    }
+
+    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.increment_size_bytes(size_bytes);
+    }
+
+    pub fn increment_parent_messages_count(&self, messages_count: u64) {
+        self.parent.increment_messages_count(messages_count);
+    }
+
+    pub fn increment_parent_segments_count(&self, segments_count: u32) {
+        self.parent.increment_segments_count(segments_count);
+    }
+
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+        self.increment_parent_size_bytes(size_bytes);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+        self.increment_parent_messages_count(messages_count);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+        self.increment_parent_segments_count(segments_count);
+    }
+
+    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.decrement_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+        self.parent.decrement_messages_count(messages_count);
+    }
+
+    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+        self.parent.decrement_segments_count(segments_count);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+        self.decrement_parent_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+        self.decrement_parent_messages_count(messages_count);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+        self.decrement_parent_segments_count(segments_count);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_parent_size_bytes(&self) {
+        self.parent.zero_out_size_bytes();
+    }
+
+    pub fn zero_out_parent_messages_count(&self) {
+        self.parent.zero_out_messages_count();
+    }
+
+    pub fn zero_out_parent_segments_count(&self) {
+        self.parent.zero_out_segments_count();
+    }
+
+    pub fn zero_out_parent_all(&self) {
+        self.parent.zero_out_all();
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+        self.zero_out_parent_size_bytes();
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_messages_count();
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_segments_count();
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+    }
+}
+
+#[derive(Default, Debug)]
+pub struct PartitionStats {
+    parent: Arc<TopicStats>,
+    messages_count: AtomicU64,
+    size_bytes: AtomicU64,
+    segments_count: AtomicU32,
+}
+
+impl PartitionStats {
+    pub fn new(parent_stats: Arc<TopicStats>) -> Self {
+        Self {
+            parent: parent_stats,
+            messages_count: AtomicU64::new(0),
+            size_bytes: AtomicU64::new(0),
+            segments_count: AtomicU32::new(0),
+        }
+    }
+
+    pub fn parent(&self) -> Arc<TopicStats> {
+        self.parent.clone()
+    }
+
+    pub fn increment_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_add(size_bytes, Ordering::AcqRel);
+        self.increment_parent_size_bytes(size_bytes);
+    }
+
+    pub fn increment_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_add(messages_count, Ordering::AcqRel);
+        self.increment_parent_messages_count(messages_count);
+    }
+
+    pub fn increment_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_add(segments_count, Ordering::AcqRel);
+        self.increment_parent_segments_count(segments_count);
+    }
+
+    pub fn increment_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.increment_size_bytes(size_bytes);
+    }
+
+    pub fn increment_parent_messages_count(&self, messages_count: u64) {
+        self.parent.increment_messages_count(messages_count);
+    }
+
+    pub fn increment_parent_segments_count(&self, segments_count: u32) {
+        self.parent.increment_segments_count(segments_count);
+    }
+
+    pub fn decrement_size_bytes(&self, size_bytes: u64) {
+        self.size_bytes.fetch_sub(size_bytes, Ordering::AcqRel);
+        self.decrement_parent_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_messages_count(&self, messages_count: u64) {
+        self.messages_count
+            .fetch_sub(messages_count, Ordering::AcqRel);
+        self.decrement_parent_messages_count(messages_count);
+    }
+
+    pub fn decrement_segments_count(&self, segments_count: u32) {
+        self.segments_count
+            .fetch_sub(segments_count, Ordering::AcqRel);
+        self.decrement_parent_segments_count(segments_count);
+    }
+
+    pub fn decrement_parent_size_bytes(&self, size_bytes: u64) {
+        self.parent.decrement_size_bytes(size_bytes);
+    }
+
+    pub fn decrement_parent_messages_count(&self, messages_count: u64) {
+        self.parent.decrement_messages_count(messages_count);
+    }
+
+    pub fn decrement_parent_segments_count(&self, segments_count: u32) {
+        self.parent.decrement_segments_count(segments_count);
+    }
+
+    pub fn size_bytes_inconsistent(&self) -> u64 {
+        self.size_bytes.load(Ordering::Relaxed)
+    }
+
+    pub fn messages_count_inconsistent(&self) -> u64 {
+        self.messages_count.load(Ordering::Relaxed)
+    }
+
+    pub fn segments_count_inconsistent(&self) -> u32 {
+        self.segments_count.load(Ordering::Relaxed)
+    }
+
+    pub fn zero_out_parent_size_bytes(&self) {
+        self.parent.zero_out_size_bytes();
+    }
+
+    pub fn zero_out_parent_messages_count(&self) {
+        self.parent.zero_out_messages_count();
+    }
+
+    pub fn zero_out_parent_segments_count(&self) {
+        self.parent.zero_out_segments_count();
+    }
+
+    pub fn zero_out_parent_all(&self) {
+        self.parent.zero_out_all();
+    }
+
+    pub fn zero_out_size_bytes(&self) {
+        self.size_bytes.store(0, Ordering::Relaxed);
+        self.zero_out_parent_size_bytes();
+    }
+
+    pub fn zero_out_messages_count(&self) {
+        self.messages_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_messages_count();
+    }
+
+    pub fn zero_out_segments_count(&self) {
+        self.segments_count.store(0, Ordering::Relaxed);
+        self.zero_out_parent_segments_count();
+    }
+
+    pub fn zero_out_all(&self) {
+        self.zero_out_size_bytes();
+        self.zero_out_messages_count();
+        self.zero_out_segments_count();
+        self.zero_out_parent_all();
+    }
+}
diff --git a/core/metadata/src/stm/consumer_group.rs 
b/core/metadata/src/stm/consumer_group.rs
index 4aed76aed..a78da082b 100644
--- a/core/metadata/src/stm/consumer_group.rs
+++ b/core/metadata/src/stm/consumer_group.rs
@@ -15,14 +15,189 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::State;
+use crate::stm::ApplyState;
+use ahash::AHashMap;
+use iggy_common::{
+    IggyTimestamp,
+    header::{Operation, PrepareHeader},
+    message::Message,
+};
+use slab::Slab;
+use std::cell::RefCell;
 
-pub struct ConsumerGroups {}
+// ============================================================================
+// ConsumerGroupMember - Individual member of a consumer group
+// ============================================================================
 
-impl State for ConsumerGroups {
+#[derive(Debug, Clone)]
+pub struct ConsumerGroupMember {
+    pub id: u32,
+    pub joined_at: IggyTimestamp,
+}
+
+impl ConsumerGroupMember {
+    pub fn new(id: u32, joined_at: IggyTimestamp) -> Self {
+        Self { id, joined_at }
+    }
+}
+
+// ============================================================================
+// ConsumerGroup - A group of consumers
+// ============================================================================
+
+#[derive(Debug, Clone)]
+pub struct ConsumerGroup {
+    pub id: usize,
+    pub stream_id: usize,
+    pub topic_id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+    pub members: Vec<ConsumerGroupMember>,
+}
+
+impl ConsumerGroup {
+    pub fn new(stream_id: usize, topic_id: usize, name: String, created_at: 
IggyTimestamp) -> Self {
+        Self {
+            id: 0,
+            stream_id,
+            topic_id,
+            name,
+            created_at,
+            members: Vec::new(),
+        }
+    }
+
+    pub fn add_member(&mut self, member: ConsumerGroupMember) {
+        self.members.push(member);
+    }
+
+    pub fn remove_member(&mut self, member_id: u32) -> 
Option<ConsumerGroupMember> {
+        if let Some(pos) = self.members.iter().position(|m| m.id == member_id) 
{
+            Some(self.members.remove(pos))
+        } else {
+            None
+        }
+    }
+
+    pub fn members_count(&self) -> usize {
+        self.members.len()
+    }
+}
+
+// ============================================================================
+// ConsumerGroups Collection
+// ============================================================================
+
+#[derive(Debug, Clone, Default)]
+pub struct ConsumerGroups {
+    // Global index for all consumer groups across all streams/topics
+    index: RefCell<AHashMap<(usize, usize, String), usize>>, // (stream_id, 
topic_id, name) -> id
+    items: RefCell<Slab<ConsumerGroup>>,
+}
+
+impl ConsumerGroups {
+    pub fn new() -> Self {
+        Self {
+            index: RefCell::new(AHashMap::with_capacity(256)),
+            items: RefCell::new(Slab::with_capacity(256)),
+        }
+    }
+
+    /// Insert a consumer group and return the assigned ID
+    pub fn insert(&self, group: ConsumerGroup) -> usize {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        let key = (group.stream_id, group.topic_id, group.name.clone());
+        let id = items.insert(group);
+        items[id].id = id;
+        index.insert(key, id);
+        id
+    }
+
+    /// Get consumer group by ID
+    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    /// Get consumer group by stream_id, topic_id, and name
+    pub fn get_by_location(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        name: &str,
+    ) -> Option<ConsumerGroup> {
+        let index = self.index.borrow();
+        let key = (stream_id, topic_id, name.to_string());
+        if let Some(&id) = index.get(&key) {
+            self.items.borrow().get(id).cloned()
+        } else {
+            None
+        }
+    }
+
+    /// Remove consumer group by ID
+    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        if !items.contains(id) {
+            return None;
+        }
+
+        let group = items.remove(id);
+        let key = (group.stream_id, group.topic_id, group.name.clone());
+        index.remove(&key);
+        Some(group)
+    }
+
+    /// Get all consumer groups for a specific topic
+    pub fn get_by_topic(&self, stream_id: usize, topic_id: usize) -> 
Vec<ConsumerGroup> {
+        self.items
+            .borrow()
+            .iter()
+            .filter_map(|(_, g)| {
+                if g.stream_id == stream_id && g.topic_id == topic_id {
+                    Some(g.clone())
+                } else {
+                    None
+                }
+            })
+            .collect()
+    }
+
+    /// Get number of consumer groups
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    /// Check if empty
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+
+    /// Get all consumer groups
+    pub fn values(&self) -> Vec<ConsumerGroup> {
+        self.items
+            .borrow()
+            .iter()
+            .map(|(_, g): (usize, &ConsumerGroup)| g.clone())
+            .collect()
+    }
+}
+
+impl ApplyState for ConsumerGroups {
     type Output = ();
+    type Input = Message<PrepareHeader>;
+
+    fn do_apply(&self, _input: &Self::Input) -> Self::Output {
+        todo!()
+    }
 
-    fn apply(&self) -> Option<Self::Output> {
-        Some(())
+    fn is_applicable(input: &Self::Input) -> bool {
+        matches!(
+            input.header().operation,
+            Operation::CreateConsumerGroup | Operation::DeleteConsumerGroup
+        )
     }
 }
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index 80646a559..09abe12d4 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -16,22 +16,49 @@
 // under the License.
 
 pub mod consumer_group;
+pub mod mux;
 pub mod stream;
 pub mod user;
 
-pub mod mux;
-// TODO: Add more state machines.
-
-pub trait State {
+// This is public interface to state, therefore it will be imported from 
different crate, for now during development I am leaving it there.
+pub trait State
+where
+    Self: Sized,
+{
     type Output;
+    type Input;
+
     // Apply the state machine logic and return an optional output.
     // The output is optional, as we model the `StateMachine`, as an variadic 
list,
     // where not all state machines will produce an output for every input 
event.
-    fn apply(&self) -> Option<Self::Output>;
+    fn apply(&self, input: &Self::Input) -> Option<Self::Output>;
 }
 
+// TODO: This interface should be private to the stm module.
 pub trait StateMachine {
     type Input;
     type Output;
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>);
 }
+
+// This will be an internal trait used to define the application logic of a 
state machine.
+pub trait ApplyState {
+    type Output;
+    type Input;
+
+    fn do_apply(&self, input: &Self::Input) -> Self::Output;
+    fn is_applicable(input: &Self::Input) -> bool;
+}
+
+// Blanket implementation of State for all types that implement ApplyState
+impl<T> State for T
+where
+    T: ApplyState,
+{
+    type Output = T::Output;
+    type Input = T::Input;
+
+    fn apply(&self, input: &Self::Input) -> Option<Self::Output> {
+        T::is_applicable(input).then(|| self.do_apply(input))
+    }
+}
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 22601bdc0..ca5e5ea57 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -15,9 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::{State, StateMachine};
 use iggy_common::{header::PrepareHeader, message::Message};
 
+use crate::stm::{State, StateMachine};
+
 // MuxStateMachine that proxies to an tuple of variadic state machines
 pub struct MuxStateMachine<T>
 where
@@ -59,6 +60,7 @@ macro_rules! variadic {
     ($a:expr,  $( $b:tt )+) => ( ($a, variadic!( $( $b )* )) );
 }
 
+// TODO: Figure out how to get around the fact that we need to hardcode the 
Input/Output type for base case.
 // Base case of the recursive resolution.
 impl StateMachine for () {
     type Input = Message<PrepareHeader>;
@@ -72,13 +74,13 @@ impl StateMachine for () {
 impl<O, S, Rest> StateMachine for variadic!(S, ...Rest)
 where
     S: State<Output = O>,
-    Rest: StateMachine<Input = Message<PrepareHeader>, Output = O>,
+    Rest: StateMachine<Input = S::Input, Output = O>,
 {
-    type Input = Message<PrepareHeader>;
+    type Input = Rest::Input;
     type Output = O;
 
     fn update(&self, input: &Self::Input, output: &mut Vec<Self::Output>) {
-        if let Some(result) = self.0.apply() {
+        if let Some(result) = self.0.apply(input) {
             output.push(result);
         }
         self.1.update(input, output)
@@ -90,10 +92,17 @@ mod tests {
     #[test]
     fn construct_mux_state_machine_from_states_with_same_output() {
         use crate::stm::*;
+        use iggy_common::header::PrepareHeader;
+        use iggy_common::message::Message;
+
+        let users = user::Users::new();
+        let streams = stream::Streams::new();
+        let cgs = consumer_group::ConsumerGroups::new();
+        let mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+
+        let input = Message::new(std::mem::size_of::<PrepareHeader>());
+        let mut output = Vec::new();
 
-        let users = user::Users {};
-        let streams = stream::Streams {};
-        let cgs = consumer_group::ConsumerGroups {};
-        let _mux = mux::MuxStateMachine::new(variadic!(users, streams, cgs));
+        mux.update(&input, &mut output);
     }
 }
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index 737e21e94..7b341eb22 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -15,14 +15,433 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::State;
+use crate::stats::{StreamStats, TopicStats};
+use crate::stm::ApplyState;
+use ahash::AHashMap;
+use iggy_common::header::Operation;
+use iggy_common::{
+    CompressionAlgorithm, Identifier, IggyExpiry, IggyTimestamp, MaxTopicSize,
+    header::PrepareHeader, message::Message,
+};
+use slab::Slab;
+use std::cell::RefCell;
+use std::sync::Arc;
 
-pub struct Streams {}
+#[derive(Debug, Clone)]
+pub struct Partition {
+    pub id: usize,
+}
+
+impl Partition {
+    pub fn new(id: usize) -> Self {
+        Self { id }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Partitions {
+    items: RefCell<Slab<Partition>>,
+}
+
+impl Partitions {
+    pub fn new() -> Self {
+        Self {
+            items: RefCell::new(Slab::with_capacity(1024)),
+        }
+    }
+
+    pub fn insert(&self, partition: Partition) -> usize {
+        let mut items = self.items.borrow_mut();
+        let id = items.insert(partition);
+        items[id].id = id;
+        id
+    }
+
+    pub fn get(&self, id: usize) -> Option<Partition> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    pub fn remove(&self, id: usize) -> Option<Partition> {
+        let mut items = self.items.borrow_mut();
+        if items.contains(id) {
+            Some(items.remove(id))
+        } else {
+            None
+        }
+    }
+
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+
+    pub fn iter(&self) -> Vec<Partition> {
+        self.items
+            .borrow()
+            .iter()
+            .map(|(_, p): (usize, &Partition)| p.clone())
+            .collect()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct ConsumerGroup {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+}
+
+impl ConsumerGroup {
+    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+        Self {
+            id: 0,
+            name,
+            created_at,
+        }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct ConsumerGroups {
+    index: RefCell<AHashMap<String, usize>>,
+    items: RefCell<Slab<ConsumerGroup>>,
+}
+
+impl ConsumerGroups {
+    pub fn new() -> Self {
+        Self {
+            index: RefCell::new(AHashMap::with_capacity(256)),
+            items: RefCell::new(Slab::with_capacity(256)),
+        }
+    }
+
+    pub fn insert(&self, group: ConsumerGroup) -> usize {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        let name = group.name.clone();
+        let id = items.insert(group);
+        items[id].id = id;
+        index.insert(name, id);
+        id
+    }
+
+    pub fn get(&self, id: usize) -> Option<ConsumerGroup> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    pub fn get_by_name(&self, name: &str) -> Option<ConsumerGroup> {
+        let index = self.index.borrow();
+        if let Some(&id) = index.get(name) {
+            self.items.borrow().get(id).cloned()
+        } else {
+            None
+        }
+    }
+
+    pub fn remove(&self, id: usize) -> Option<ConsumerGroup> {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        if !items.contains(id) {
+            return None;
+        }
+
+        let group = items.remove(id);
+        index.remove(&group.name);
+        Some(group)
+    }
+
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Topic {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+    pub replication_factor: u8,
+    pub message_expiry: IggyExpiry,
+    pub compression_algorithm: CompressionAlgorithm,
+    pub max_topic_size: MaxTopicSize,
+
+    pub stats: Arc<TopicStats>,
+    pub partitions: Partitions,
+    pub consumer_groups: ConsumerGroups,
+}
+
+impl Topic {
+    pub fn new(
+        name: String,
+        created_at: IggyTimestamp,
+        replication_factor: u8,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        stream_stats: Arc<StreamStats>,
+    ) -> Self {
+        Self {
+            id: 0,
+            name,
+            created_at,
+            replication_factor,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            stats: Arc::new(TopicStats::new(stream_stats)),
+            partitions: Partitions::new(),
+            consumer_groups: ConsumerGroups::new(),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Topics {
+    index: RefCell<AHashMap<String, usize>>,
+    items: RefCell<Slab<Topic>>,
+}
+
+impl Topics {
+    pub fn new() -> Self {
+        Self {
+            index: RefCell::new(AHashMap::with_capacity(1024)),
+            items: RefCell::new(Slab::with_capacity(1024)),
+        }
+    }
+
+    pub fn insert(&self, topic: Topic) -> usize {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        let name = topic.name.clone();
+        let id = items.insert(topic);
+        items[id].id = id;
+        index.insert(name, id);
+        id
+    }
+
+    pub fn get(&self, id: usize) -> Option<Topic> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    pub fn get_by_name(&self, name: &str) -> Option<Topic> {
+        let index = self.index.borrow();
+        if let Some(&id) = index.get(name) {
+            self.items.borrow().get(id).cloned()
+        } else {
+            None
+        }
+    }
+
+    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Topic> {
+        match identifier.kind {
+            iggy_common::IdKind::Numeric => {
+                if let Ok(id) = identifier.get_u32_value() {
+                    self.get(id as usize)
+                } else {
+                    None
+                }
+            }
+            iggy_common::IdKind::String => {
+                if let Ok(name) = identifier.get_string_value() {
+                    self.get_by_name(&name)
+                } else {
+                    None
+                }
+            }
+        }
+    }
+
+    pub fn remove(&self, id: usize) -> Option<Topic> {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        if !items.contains(id) {
+            return None;
+        }
+
+        let topic = items.remove(id);
+        index.remove(&topic.name);
+        Some(topic)
+    }
+
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Stream {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+
+    pub stats: Arc<StreamStats>,
+    pub topics: Topics,
+}
+
+impl Stream {
+    pub fn new(name: String, created_at: IggyTimestamp) -> Self {
+        Self {
+            id: 0,
+            name,
+            created_at,
+            stats: Arc::new(StreamStats::default()),
+            topics: Topics::new(),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Streams {
+    index: RefCell<AHashMap<String, usize>>,
+    items: RefCell<Slab<Stream>>,
+}
+
+impl Streams {
+    pub fn new() -> Self {
+        Self {
+            index: RefCell::new(AHashMap::with_capacity(256)),
+            items: RefCell::new(Slab::with_capacity(256)),
+        }
+    }
+
+    pub fn insert(&self, stream: Stream) -> usize {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        let name = stream.name.clone();
+        let id = items.insert(stream);
+        items[id].id = id;
+        index.insert(name, id);
+        id
+    }
+
+    pub fn get(&self, id: usize) -> Option<Stream> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    pub fn get_by_name(&self, name: &str) -> Option<Stream> {
+        let index = self.index.borrow();
+        if let Some(&id) = index.get(name) {
+            self.items.borrow().get(id).cloned()
+        } else {
+            None
+        }
+    }
+
+    pub fn get_by_identifier(&self, identifier: &Identifier) -> Option<Stream> 
{
+        match identifier.kind {
+            iggy_common::IdKind::Numeric => {
+                if let Ok(id) = identifier.get_u32_value() {
+                    self.get(id as usize)
+                } else {
+                    None
+                }
+            }
+            iggy_common::IdKind::String => {
+                if let Ok(name) = identifier.get_string_value() {
+                    self.get_by_name(&name)
+                } else {
+                    None
+                }
+            }
+        }
+    }
+
+    pub fn remove(&self, id: usize) -> Option<Stream> {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        if !items.contains(id) {
+            return None;
+        }
+
+        let stream = items.remove(id);
+        index.remove(&stream.name);
+        Some(stream)
+    }
+
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+
+    pub fn iter(&self) -> Vec<Stream> {
+        self.items
+            .borrow()
+            .iter()
+            .map(|(_, s): (usize, &Stream)| s.clone())
+            .collect()
+    }
+}
+
+impl ApplyState for Streams {
+    type Output = ();
+    type Input = Message<PrepareHeader>;
+
+    fn do_apply(&self, _input: &Self::Input) -> Self::Output {
+        ()
+    }
+
+    fn is_applicable(input: &Self::Input) -> bool {
+        matches!(
+            input.header().operation,
+            Operation::CreateStream
+                | Operation::UpdateStream
+                | Operation::DeleteStream
+                | Operation::PurgeStream
+        )
+    }
+}
 
-impl State for Streams {
+impl ApplyState for Topics {
     type Output = ();
+    type Input = Message<PrepareHeader>;
+
+    fn do_apply(&self, _input: &Self::Input) -> Self::Output {
+        ()
+    }
+
+    fn is_applicable(input: &Self::Input) -> bool {
+        matches!(
+            input.header().operation,
+            Operation::CreateTopic
+                | Operation::UpdateTopic
+                | Operation::DeleteTopic
+                | Operation::PurgeTopic
+        )
+    }
+}
+
+impl ApplyState for Partitions {
+    type Output = ();
+    type Input = Message<PrepareHeader>;
+
+    fn do_apply(&self, _input: &Self::Input) -> Self::Output {
+        ()
+    }
 
-    fn apply(&self) -> Option<Self::Output> {
-        Some(())
+    fn is_applicable(input: &Self::Input) -> bool {
+        matches!(
+            input.header().operation,
+            Operation::CreatePartitions | Operation::DeletePartitions | 
Operation::DeleteSegments
+        )
     }
 }
diff --git a/core/metadata/src/stm/user.rs b/core/metadata/src/stm/user.rs
index d5f0fac96..409c33030 100644
--- a/core/metadata/src/stm/user.rs
+++ b/core/metadata/src/stm/user.rs
@@ -15,14 +15,249 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::stm::State;
+use crate::{permissioner::permissioner::Permissioner, stm::ApplyState};
+use ahash::AHashMap;
+use iggy_common::{
+    Identifier, IggyError, IggyTimestamp, Permissions, PersonalAccessToken, 
UserId, UserStatus,
+    header::{Operation, PrepareHeader},
+    message::Message,
+};
+use slab::Slab;
+use std::cell::RefCell;
 
-pub struct Users {}
+#[derive(Debug, Clone)]
+pub struct User {
+    pub id: UserId,
+    pub username: String,
+    pub password: String,
+    pub status: UserStatus,
+    pub created_at: IggyTimestamp,
+    pub permissions: Option<Permissions>,
+    pub personal_access_tokens: AHashMap<String, PersonalAccessToken>,
+}
+
+impl User {
+    pub fn new(
+        username: String,
+        password: String,
+        status: UserStatus,
+        created_at: IggyTimestamp,
+        permissions: Option<Permissions>,
+    ) -> Self {
+        Self {
+            id: 0,
+            username,
+            password,
+            status,
+            created_at,
+            permissions,
+            personal_access_tokens: AHashMap::new(),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Default)]
+pub struct Users {
+    index: RefCell<AHashMap<String, usize>>,
+    items: RefCell<Slab<User>>,
+    permissioner: RefCell<Permissioner>,
+}
+
+impl Users {
+    pub fn new() -> Self {
+        Self {
+            index: RefCell::new(AHashMap::with_capacity(1024)),
+            items: RefCell::new(Slab::with_capacity(1024)),
+            permissioner: RefCell::new(Permissioner::new()),
+        }
+    }
+
+    /// Insert a user and return the assigned ID
+    pub fn insert(&self, user: User) -> usize {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        let username = user.username.clone();
+        let id = items.insert(user);
+        items[id].id = id as u32;
+        index.insert(username, id);
+        id
+    }
+
+    /// Get user by ID
+    pub fn get(&self, id: usize) -> Option<User> {
+        self.items.borrow().get(id).cloned()
+    }
+
+    /// Get user by username or ID (via Identifier enum)
+    pub fn get_by_identifier(&self, identifier: &Identifier) -> 
Result<Option<User>, IggyError> {
+        match identifier.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = identifier.get_u32_value()? as usize;
+                Ok(self.items.borrow().get(id).cloned())
+            }
+            iggy_common::IdKind::String => {
+                let username = identifier.get_string_value()?;
+                let index = self.index.borrow();
+                if let Some(&id) = index.get(&username) {
+                    Ok(self.items.borrow().get(id).cloned())
+                } else {
+                    Ok(None)
+                }
+            }
+        }
+    }
+
+    /// Remove user by ID
+    pub fn remove(&self, id: usize) -> Option<User> {
+        let mut items = self.items.borrow_mut();
+        let mut index = self.index.borrow_mut();
+
+        if !items.contains(id) {
+            return None;
+        }
+
+        let user = items.remove(id);
+        index.remove(&user.username);
+        Some(user)
+    }
+
+    /// Check if user exists
+    pub fn contains(&self, identifier: &Identifier) -> bool {
+        match identifier.kind {
+            iggy_common::IdKind::Numeric => {
+                if let Ok(id) = identifier.get_u32_value() {
+                    self.items.borrow().contains(id as usize)
+                } else {
+                    false
+                }
+            }
+            iggy_common::IdKind::String => {
+                if let Ok(username) = identifier.get_string_value() {
+                    self.index.borrow().contains_key(&username)
+                } else {
+                    false
+                }
+            }
+        }
+    }
+
+    /// Get all users as a Vec
+    pub fn values(&self) -> Vec<User> {
+        self.items
+            .borrow()
+            .iter()
+            .map(|(_, u): (usize, &User)| u.clone())
+            .collect()
+    }
+
+    /// Get number of users
+    pub fn len(&self) -> usize {
+        self.items.borrow().len()
+    }
+
+    /// Check if empty
+    pub fn is_empty(&self) -> bool {
+        self.items.borrow().is_empty()
+    }
+
+    /// Check if username already exists
+    pub fn username_exists(&self, username: &str) -> bool {
+        self.index.borrow().contains_key(username)
+    }
+
+    /// Get ID by username
+    pub fn get_id_by_username(&self, username: &str) -> Option<usize> {
+        self.index.borrow().get(username).copied()
+    }
+
+    /// Initialize permissions for a user
+    pub fn init_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
+        self.permissioner
+            .borrow_mut()
+            .init_permissions(user_id, permissions);
+    }
+
+    /// Update permissions for a user
+    pub fn update_permissions(&self, user_id: UserId, permissions: 
Option<Permissions>) {
+        self.permissioner
+            .borrow_mut()
+            .update_permissions_for_user(user_id, permissions);
+    }
+
+    /// Delete permissions for a user
+    pub fn delete_permissions(&self, user_id: UserId) {
+        self.permissioner.borrow_mut().delete_permissions(user_id);
+    }
+
+    /// Update username
+    pub fn update_username(
+        &self,
+        identifier: &Identifier,
+        new_username: String,
+    ) -> Result<(), IggyError> {
+        let id = match identifier.kind {
+            iggy_common::IdKind::Numeric => identifier.get_u32_value()? as 
usize,
+            iggy_common::IdKind::String => {
+                let username = identifier.get_string_value()?;
+                let index = self.index.borrow();
+                *index
+                    .get(&username)
+                    .ok_or_else(|| 
IggyError::ResourceNotFound(username.to_string()))?
+            }
+        };
+
+        let old_username = {
+            let items = self.items.borrow();
+            let user = items
+                .get(id)
+                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
+            user.username.clone()
+        };
+
+        if old_username == new_username {
+            return Ok(());
+        }
+
+        tracing::trace!(
+            "Updating username: '{}' → '{}' for user ID: {}",
+            old_username,
+            new_username,
+            id
+        );
+
+        {
+            let mut items = self.items.borrow_mut();
+            let user = items
+                .get_mut(id)
+                .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
+            user.username = new_username.clone();
+        }
 
-impl State for Users {
+        let mut index = self.index.borrow_mut();
+        index.remove(&old_username);
+        index.insert(new_username, id);
+
+        Ok(())
+    }
+}
+
+impl ApplyState for Users {
     type Output = ();
+    type Input = Message<PrepareHeader>;
+
+    fn do_apply(&self, _input: &Self::Input) -> Self::Output {
+        todo!()
+    }
 
-    fn apply(&self) -> Option<Self::Output> {
-        Some(())
+    fn is_applicable(input: &Self::Input) -> bool {
+        matches!(
+            input.header().operation,
+            Operation::CreateUser
+                | Operation::UpdateUser
+                | Operation::DeleteUser
+                | Operation::ChangePassword
+                | Operation::UpdatePermissions
+        )
     }
 }
diff --git a/core/server/src/binary/mapper.rs b/core/server/src/binary/mapper.rs
index 7b17c038e..6e98081d6 100644
--- a/core/server/src/binary/mapper.rs
+++ b/core/server/src/binary/mapper.rs
@@ -22,7 +22,6 @@ use crate::slab::Keyed;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
 use crate::streaming::clients::client_manager::Client;
 use crate::streaming::partitions::partition::PartitionRoot;
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
 use crate::streaming::streams::stream;
 use crate::streaming::topics::consumer_group::{ConsumerGroupMembers, 
ConsumerGroupRoot, Member};
@@ -30,7 +29,9 @@ use crate::streaming::topics::topic::{self, TopicRoot};
 use crate::streaming::users::user::User;
 use arcshift::SharedGetGuard;
 use bytes::{BufMut, Bytes, BytesMut};
-use iggy_common::{BytesSerializable, ConsumerOffsetInfo, Stats, 
TransportProtocol, UserId};
+use iggy_common::{
+    BytesSerializable, ConsumerOffsetInfo, PersonalAccessToken, Stats, 
TransportProtocol, UserId,
+};
 use slab::Slab;
 
 pub fn map_stats(stats: &Stats) -> Bytes {
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 67c832231..f461cd472 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -53,7 +53,6 @@ use crate::{
             storage::{load_consumer_group_offsets, load_consumer_offsets},
         },
         persistence::persister::{FilePersister, FileWithSyncPersister, 
PersisterKind},
-        personal_access_tokens::personal_access_token::PersonalAccessToken,
         polling_consumer::ConsumerGroupId,
         segments::{Segment, storage::Storage},
         stats::{PartitionStats, StreamStats, TopicStats},
@@ -69,7 +68,7 @@ use ahash::HashMap;
 use compio::{fs::create_dir_all, runtime::Runtime};
 use err_trail::ErrContext;
 use iggy_common::{
-    IggyByteSize, IggyError,
+    IggyByteSize, IggyError, PersonalAccessToken,
     defaults::{
         DEFAULT_ROOT_USERNAME, MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, 
MIN_PASSWORD_LENGTH,
         MIN_USERNAME_LENGTH,
diff --git a/core/server/src/http/http_shard_wrapper.rs 
b/core/server/src/http/http_shard_wrapper.rs
index f279b51be..7bcfdb65b 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -26,11 +26,11 @@ use send_wrapper::SendWrapper;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::shard::system::messages::PollingArgs;
 use crate::state::command::EntryCommand;
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
 use crate::streaming::topics;
 use crate::streaming::users::user::User;
 use crate::{shard::IggyShard, streaming::session::Session};
+use iggy_common::PersonalAccessToken;
 
 /// A wrapper around IggyShard that is safe to use in HTTP handlers.
 ///
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index d2910ae1e..afe243529 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -20,11 +20,11 @@ use crate::http::jwt::json_web_token::GeneratedToken;
 use crate::slab::Keyed;
 use crate::slab::traits_ext::{EntityComponentSystem, IntoComponents};
 use crate::streaming::clients::client_manager::Client;
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::stats::TopicStats;
 use crate::streaming::topics::consumer_group::{ConsumerGroupMembers, 
ConsumerGroupRoot};
 use crate::streaming::topics::topic::TopicRoot;
 use crate::streaming::users::user::User;
+use iggy_common::PersonalAccessToken;
 use iggy_common::{ConsumerGroupDetails, ConsumerGroupInfo, 
ConsumerGroupMember, IggyByteSize};
 use iggy_common::{IdentityInfo, PersonalAccessTokenInfo, TokenInfo, 
TopicDetails};
 use iggy_common::{UserInfo, UserInfoDetails};
diff --git a/core/server/src/http/personal_access_tokens.rs 
b/core/server/src/http/personal_access_tokens.rs
index 66cb50882..75382fb76 100644
--- a/core/server/src/http/personal_access_tokens.rs
+++ b/core/server/src/http/personal_access_tokens.rs
@@ -24,7 +24,6 @@ use 
crate::http::mapper::map_generated_access_token_to_identity_info;
 use crate::http::shared::AppState;
 use crate::state::command::EntryCommand;
 use crate::state::models::CreatePersonalAccessTokenWithHash;
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
 use axum::extract::{Path, State};
 use axum::http::StatusCode;
@@ -32,6 +31,7 @@ use axum::routing::{delete, get, post};
 use axum::{Extension, Json, Router, debug_handler};
 use err_trail::ErrContext;
 use iggy_common::IdentityInfo;
+use iggy_common::PersonalAccessToken;
 use iggy_common::Validatable;
 use iggy_common::create_personal_access_token::CreatePersonalAccessToken;
 use iggy_common::delete_personal_access_token::DeletePersonalAccessToken;
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 189abf507..7b26aff1e 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -18,13 +18,13 @@
 
 use super::COMPONENT;
 use crate::shard::IggyShard;
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::session::Session;
 use crate::streaming::users::user::User;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
+use iggy_common::PersonalAccessToken;
 use tracing::{error, info};
 
 impl IggyShard {
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index eef75d958..1180d06f9 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -17,7 +17,6 @@
 
 use crate::streaming::{
     partitions::partition,
-    personal_access_tokens::personal_access_token::PersonalAccessToken,
     streams::stream,
     topics::{
         consumer_group::{self},
@@ -25,8 +24,8 @@ use crate::streaming::{
     },
 };
 use iggy_common::{
-    CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, 
TransportProtocol,
-    UserStatus,
+    CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, Permissions, 
PersonalAccessToken,
+    TransportProtocol, UserStatus,
 };
 use std::net::SocketAddr;
 use strum::Display;
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 454439a3d..35a560164 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -20,7 +20,6 @@ use crate::bootstrap::create_root_user;
 use crate::state::file::FileState;
 use crate::state::models::CreateUserWithId;
 use crate::state::{COMPONENT, EntryCommand, StateEntry};
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use ahash::AHashMap;
 use err_trail::ErrContext;
 use iggy_common::CompressionAlgorithm;
@@ -28,6 +27,7 @@ use iggy_common::IggyError;
 use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
 use iggy_common::MaxTopicSize;
+use iggy_common::PersonalAccessToken;
 use iggy_common::create_user::CreateUser;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
 use iggy_common::{IdKind, Identifier, Permissions, UserStatus};
diff --git a/core/server/src/streaming/clients/client_manager.rs 
b/core/server/src/streaming/clients/client_manager.rs
index 349980b55..d1cfe0887 100644
--- a/core/server/src/streaming/clients/client_manager.rs
+++ b/core/server/src/streaming/clients/client_manager.rs
@@ -17,12 +17,12 @@
  */
 
 use crate::streaming::session::Session;
-use crate::streaming::utils::{hash, ptr::EternalPtr};
+use crate::streaming::utils::ptr::EternalPtr;
 use dashmap::DashMap;
-use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
 use iggy_common::TransportProtocol;
 use iggy_common::UserId;
+use iggy_common::{IggyError, calculate_32};
 use std::net::SocketAddr;
 
 pub struct ClientManager {
@@ -61,7 +61,7 @@ pub struct ConsumerGroup {
 
 impl ClientManager {
     pub fn add_client(&self, address: &SocketAddr, transport: 
TransportProtocol) -> Session {
-        let client_id = hash::calculate_32(address.to_string().as_bytes());
+        let client_id = calculate_32(address.to_string().as_bytes());
         let session = Session::from_client_id(client_id, *address);
         let client = Client {
             user_id: None,
diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs
index fe1eb0611..a6141bd8b 100644
--- a/core/server/src/streaming/mod.rs
+++ b/core/server/src/streaming/mod.rs
@@ -21,7 +21,6 @@ pub mod deduplication;
 pub mod diagnostics;
 pub mod partitions;
 pub mod persistence;
-pub mod personal_access_tokens;
 pub mod polling_consumer;
 pub mod segments;
 pub mod session;
diff --git a/core/server/src/streaming/polling_consumer.rs 
b/core/server/src/streaming/polling_consumer.rs
index dcc5f0f4a..42a2ceb23 100644
--- a/core/server/src/streaming/polling_consumer.rs
+++ b/core/server/src/streaming/polling_consumer.rs
@@ -16,8 +16,7 @@
  * under the License.
  */
 
-use crate::streaming::utils::hash;
-use iggy_common::{IdKind, Identifier};
+use iggy_common::{IdKind, Identifier, calculate_32};
 use std::fmt::{Display, Formatter};
 
 #[derive(Debug, PartialEq, Eq, Hash, Copy, Clone)]
@@ -56,7 +55,7 @@ impl PollingConsumer {
     pub fn resolve_consumer_id(identifier: &Identifier) -> usize {
         match identifier.kind {
             IdKind::Numeric => identifier.get_u32_value().unwrap() as usize,
-            IdKind::String => hash::calculate_32(&identifier.value) as usize,
+            IdKind::String => calculate_32(&identifier.value) as usize,
         }
     }
 }
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 0972596b8..d5f77de9d 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -35,10 +35,9 @@ use crate::{
             },
             topic::{Topic, TopicRef, TopicRefMut, TopicRoot},
         },
-        utils::hash,
     },
 };
-use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize};
+use iggy_common::{CompressionAlgorithm, Identifier, IggyExpiry, MaxTopicSize, 
calculate_32};
 use slab::Slab;
 
 pub fn rename_index(
@@ -100,7 +99,7 @@ pub fn calculate_partition_id_by_messages_key_hash(
     upperbound: usize,
     messages_key: &[u8],
 ) -> usize {
-    let messages_key_hash = hash::calculate_32(messages_key) as usize;
+    let messages_key_hash = calculate_32(messages_key) as usize;
     let partition_id = messages_key_hash % upperbound;
     tracing::trace!(
         "Calculated partition ID: {} for messages key: {:?}, hash: {}",
diff --git a/core/server/src/streaming/users/user.rs 
b/core/server/src/streaming/users/user.rs
index 437ce955b..672662d3e 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -15,10 +15,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use 
crate::streaming::personal_access_tokens::personal_access_token::PersonalAccessToken;
 use crate::streaming::utils::crypto;
 use dashmap::DashMap;
 use iggy_common::IggyTimestamp;
+use iggy_common::PersonalAccessToken;
 use iggy_common::UserStatus;
 use iggy_common::defaults::*;
 use iggy_common::{Permissions, UserId};
diff --git a/core/server/src/streaming/utils/mod.rs 
b/core/server/src/streaming/utils/mod.rs
index 6acd0abc0..00e9f65f3 100644
--- a/core/server/src/streaming/utils/mod.rs
+++ b/core/server/src/streaming/utils/mod.rs
@@ -19,6 +19,5 @@
 pub mod address;
 pub mod crypto;
 pub mod file;
-pub mod hash;
 pub mod ptr;
 pub mod random_id;


Reply via email to