This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e27ebc8d refactor(server): add metadata module foundation (#2551)
3e27ebc8d is described below

commit 3e27ebc8dd5dbf257b816993908dc0747c4f8849
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 14 10:06:30 2026 +0100

    refactor(server): add metadata module foundation (#2551)
    
    Add new metadata module with LeftRight wrapping slab collections.
    Refactor PersonalAccessToken to use Arc<str> instead of Arc<String>.
    Module compiles but is not yet wired into the system.
---
 Cargo.lock                                         |  43 +-
 DEPENDENCIES.md                                    |   3 +
 core/common/Cargo.toml                             |   1 -
 core/common/src/collections/segmented_slab.rs      | 504 -----------------
 core/common/src/lib.rs                             |   1 -
 .../common/src/types/personal_access_tokens/mod.rs |  18 +-
 core/server/Cargo.toml                             |   2 +-
 core/server/src/bootstrap.rs                       |   2 +-
 core/server/src/http/mapper.rs                     |   2 +-
 core/server/src/lib.rs                             |   1 +
 core/server/src/metadata/absorb.rs                 | 348 ++++++++++++
 core/server/src/metadata/consumer_group.rs         |  59 ++
 .../src/metadata/consumer_group_member.rs}         |  23 +-
 .../mod.rs => server/src/metadata/inner.rs}        |  28 +-
 core/server/src/metadata/mod.rs                    |  68 +++
 core/server/src/metadata/ops.rs                    | 129 +++++
 core/server/src/metadata/partition.rs              |  36 ++
 core/server/src/metadata/reader.rs                 | 485 +++++++++++++++++
 core/server/src/metadata/stream.rs                 |  64 +++
 core/server/src/metadata/topic.rs                  |  72 +++
 .../mod.rs => server/src/metadata/user.rs}         |  14 +-
 core/server/src/metadata/writer.rs                 | 597 +++++++++++++++++++++
 .../src/shard/system/personal_access_tokens.rs     |   8 +-
 .../periodic/personal_access_token_cleaner.rs      |   2 +-
 core/server/src/streaming/users/user.rs            |   2 +-
 25 files changed, 1981 insertions(+), 531 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b5761f4f3..88953e444 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3505,6 +3505,21 @@ dependencies = [
  "slab",
 ]
 
+[[package]]
+name = "generator"
+version = "0.8.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9"
+dependencies = [
+ "cc",
+ "cfg-if",
+ "libc",
+ "log",
+ "rustversion",
+ "windows-link 0.2.1",
+ "windows-result 0.4.1",
+]
+
 [[package]]
 name = "generic-array"
 version = "0.14.7"
@@ -4684,7 +4699,6 @@ dependencies = [
  "serde_json",
  "serde_with",
  "serial_test",
- "slab",
  "strum 0.27.2",
  "thiserror 2.0.17",
  "tokio",
@@ -5267,6 +5281,17 @@ dependencies = [
  "spin",
 ]
 
+[[package]]
+name = "left-right"
+version = "0.11.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0f0c21e4c8ff95f487fb34e6f9182875f42c84cef966d29216bf115d9bba835a"
+dependencies = [
+ "crossbeam-utils",
+ "loom",
+ "slab",
+]
+
 [[package]]
 name = "lending-iterator"
 version = "0.1.7"
@@ -5575,6 +5600,19 @@ dependencies = [
  "logos-codegen",
 ]
 
+[[package]]
+name = "loom"
+version = "0.7.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca"
+dependencies = [
+ "cfg-if",
+ "generator",
+ "scoped-tls",
+ "tracing",
+ "tracing-subscriber",
+]
+
 [[package]]
 name = "lru-slab"
 version = "0.1.2"
@@ -8288,6 +8326,7 @@ dependencies = [
  "hwlocality",
  "iggy_common",
  "jsonwebtoken",
+ "left-right",
  "lending-iterator",
  "mimalloc",
  "mime_guess",
@@ -9633,9 +9672,11 @@ dependencies = [
  "once_cell",
  "regex-automata",
  "sharded-slab",
+ "smallvec",
  "thread_local",
  "tracing",
  "tracing-core",
+ "tracing-log",
 ]
 
 [[package]]
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 8423e83d3..31143eead 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -317,6 +317,7 @@ futures-sink: 0.3.31, "Apache-2.0 OR MIT",
 futures-task: 0.3.31, "Apache-2.0 OR MIT",
 futures-timer: 3.0.3, "Apache-2.0 OR MIT",
 futures-util: 0.3.31, "Apache-2.0 OR MIT",
+generator: 0.8.8, "Apache-2.0 OR MIT",
 generic-array: 0.14.7, "MIT",
 getrandom: 0.2.16, "Apache-2.0 OR MIT",
 getrandom: 0.3.4, "Apache-2.0 OR MIT",
@@ -446,6 +447,7 @@ kqueue: 1.1.1, "MIT",
 kqueue-sys: 1.0.4, "MIT",
 language-tags: 0.3.2, "Apache-2.0 OR MIT",
 lazy_static: 1.5.0, "Apache-2.0 OR MIT",
+left-right: 0.11.7, "Apache-2.0 OR MIT",
 lending-iterator: 0.1.7, "Apache-2.0 OR MIT OR Zlib",
 lending-iterator-proc_macros: 0.1.7, "Apache-2.0 OR MIT OR Zlib",
 lexical-core: 1.0.6, "Apache-2.0 OR MIT",
@@ -479,6 +481,7 @@ log: 0.4.29, "Apache-2.0 OR MIT",
 logos: 0.15.1, "Apache-2.0 OR MIT",
 logos-codegen: 0.15.1, "Apache-2.0 OR MIT",
 logos-derive: 0.15.1, "Apache-2.0 OR MIT",
+loom: 0.7.2, "MIT",
 lru-slab: 0.1.2, "Apache-2.0 OR MIT OR Zlib",
 lz4_flex: 0.11.5, "MIT",
 lzma-rust2: 0.15.4, "Apache-2.0",
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index 65124b631..a529c1ab3 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -63,7 +63,6 @@ rustls = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
 serde_with = { workspace = true, features = ["base64"] }
-slab = "0.4.11"
 strum = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
diff --git a/core/common/src/collections/segmented_slab.rs 
b/core/common/src/collections/segmented_slab.rs
deleted file mode 100644
index fc75239a4..000000000
--- a/core/common/src/collections/segmented_slab.rs
+++ /dev/null
@@ -1,504 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Concurrent segmented slab with structural sharing for RCU patterns.
-//!
-//! # Design Goals
-//!
-//! - O(1) access time
-//! - Structural sharing via Arc-wrapped segments (cheap clones)
-//! - Lock-free reads with copy-on-write modifications
-//! - Slab-assigned keys with slot reuse via free list
-//!
-//! # Architecture
-//!
-//! Data is divided into fixed-size segments, each backed by a `Slab<T>`.
-//! Keys are encoded as: `global_key = (segment_idx << SEGMENT_BITS) | 
local_key`
-//!
-//! Slab handles ID assignment internally:
-//! - `insert(value)` returns the assigned key
-//! - Removed slots are reused via Slab's free list
-//! - No external ID generation needed
-//!
-//! # Structural Sharing
-//!
-//! Each segment is wrapped in `Arc`. On modification:
-//! - `Arc::make_mut` clones only if segment is shared
-//! - Unmodified segments remain shared across snapshots
-//! - Clone cost is O(num_segments), not O(num_entries)
-
-use slab::Slab;
-use std::sync::Arc;
-
-/// Concurrent segmented slab with structural sharing.
-///
-/// # Performance Characteristics
-///
-/// | Operation | Time Complexity | Notes |
-/// |-----------|-----------------|-------|
-/// | get       | O(1)            | Direct segment + slab indexing |
-/// | insert    | O(1) amortized  | May clone segment if shared |
-/// | remove    | O(1) amortized  | May clone segment if shared |
-/// | clone     | O(num_segments) | Just Arc clones, not data copies |
-///
-/// # Key Encoding
-///
-/// Keys are `usize` where:
-/// - High bits `(key >> SEGMENT_BITS)` = segment index
-/// - Low bits `(key & SEGMENT_MASK)` = local slab key
-///
-/// # Type Parameter
-///
-/// `SEGMENT_CAPACITY` must be a power of 2. This is enforced at compile time.
-#[derive(Clone)]
-pub struct SegmentedSlab<T, const SEGMENT_CAPACITY: usize> {
-    segments: Vec<Arc<Slab<T>>>,
-    len: usize,
-}
-
-impl<T: std::fmt::Debug, const SEGMENT_CAPACITY: usize> std::fmt::Debug
-    for SegmentedSlab<T, SEGMENT_CAPACITY>
-{
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        f.debug_struct("SegmentedSlab")
-            .field("len", &self.len)
-            .field("segments", &self.segments.len())
-            .field("segment_capacity", &SEGMENT_CAPACITY)
-            .finish()
-    }
-}
-
-impl<T, const SEGMENT_CAPACITY: usize> Default for SegmentedSlab<T, 
SEGMENT_CAPACITY> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-impl<T, const SEGMENT_CAPACITY: usize> SegmentedSlab<T, SEGMENT_CAPACITY> {
-    const SEGMENT_BITS: usize = {
-        assert!(SEGMENT_CAPACITY > 0, "SEGMENT_CAPACITY must be positive");
-        assert!(
-            SEGMENT_CAPACITY.is_power_of_two(),
-            "SEGMENT_CAPACITY must be a power of 2"
-        );
-        SEGMENT_CAPACITY.trailing_zeros() as usize
-    };
-    const SEGMENT_MASK: usize = SEGMENT_CAPACITY - 1;
-
-    pub fn new() -> Self {
-        // Force evaluation to trigger compile-time assertions
-        let _ = Self::SEGMENT_BITS;
-        Self {
-            segments: Vec::new(),
-            len: 0,
-        }
-    }
-
-    /// Create from key-value pairs (for bootstrap/recovery).
-    ///
-    /// Keys determine segment placement. Uses slab's `FromIterator<(usize, 
T)>`
-    /// to place entries at specific keys within each segment.
-    ///
-    /// # Panics
-    ///
-    /// Panics if duplicate keys are provided.
-    pub fn from_entries(entries: impl IntoIterator<Item = (usize, T)>) -> Self 
{
-        use std::collections::HashSet;
-
-        let mut segment_entries: Vec<Vec<(usize, T)>> = Vec::new();
-        let mut seen_keys: HashSet<usize> = HashSet::new();
-        let mut len = 0;
-
-        for (key, value) in entries {
-            assert!(seen_keys.insert(key), "duplicate key {key} in 
from_entries");
-
-            let (seg_idx, local_key) = Self::decode_key(key);
-
-            while segment_entries.len() <= seg_idx {
-                segment_entries.push(Vec::new());
-            }
-
-            segment_entries[seg_idx].push((local_key, value));
-            len += 1;
-        }
-
-        let segments = segment_entries
-            .into_iter()
-            .map(|entries| Arc::new(entries.into_iter().collect::<Slab<T>>()))
-            .collect();
-
-        Self { segments, len }
-    }
-
-    /// Decodes global key into (segment_index, local_key).
-    #[inline]
-    const fn decode_key(key: usize) -> (usize, usize) {
-        (key >> Self::SEGMENT_BITS, key & Self::SEGMENT_MASK)
-    }
-
-    /// Encodes segment and local indices into global key.
-    #[inline]
-    const fn encode_key(segment_idx: usize, local_key: usize) -> usize {
-        (segment_idx << Self::SEGMENT_BITS) | local_key
-    }
-
-    #[inline]
-    pub fn len(&self) -> usize {
-        self.len
-    }
-
-    #[inline]
-    pub fn is_empty(&self) -> bool {
-        self.len == 0
-    }
-
-    /// Returns reference to value at key, or None if not present.
-    #[inline]
-    pub fn get(&self, key: usize) -> Option<&T> {
-        let (seg_idx, local_key) = Self::decode_key(key);
-        self.segments.get(seg_idx)?.get(local_key)
-    }
-
-    /// Returns true if key is present.
-    #[inline]
-    pub fn contains_key(&self, key: usize) -> bool {
-        let (seg_idx, local_key) = Self::decode_key(key);
-        self.segments
-            .get(seg_idx)
-            .is_some_and(|slab| slab.contains(local_key))
-    }
-
-    /// Returns iterator over (key, &value) pairs.
-    pub fn iter(&self) -> impl Iterator<Item = (usize, &T)> + '_ {
-        self.segments
-            .iter()
-            .enumerate()
-            .flat_map(|(seg_idx, slab)| {
-                slab.iter()
-                    .map(move |(local_key, value)| (Self::encode_key(seg_idx, 
local_key), value))
-            })
-    }
-
-    /// Returns iterator over all keys.
-    pub fn keys(&self) -> impl Iterator<Item = usize> + '_ {
-        self.iter().map(|(k, _)| k)
-    }
-
-    /// Returns iterator over all values.
-    pub fn values(&self) -> impl Iterator<Item = &T> + '_ {
-        self.iter().map(|(_, v)| v)
-    }
-}
-
-impl<T: Clone, const SEGMENT_CAPACITY: usize> SegmentedSlab<T, 
SEGMENT_CAPACITY> {
-    /// Insert value, returning (new_slab, assigned_key).
-    ///
-    /// Slab assigns the key, reusing freed slots when available.
-    /// Only the affected segment is cloned if shared.
-    pub fn insert(mut self, value: T) -> (Self, usize) {
-        let seg_idx = self
-            .segments
-            .iter()
-            .position(|slab| slab.vacant_key() < SEGMENT_CAPACITY)
-            .unwrap_or(self.segments.len());
-
-        if seg_idx >= self.segments.len() {
-            self.segments.push(Arc::new(Slab::new()));
-        }
-
-        let slab = Arc::make_mut(&mut self.segments[seg_idx]);
-        let local_key = slab.insert(value);
-        let global_key = Self::encode_key(seg_idx, local_key);
-
-        self.len += 1;
-        (self, global_key)
-    }
-
-    /// Update value at existing key, returning (new_slab, success).
-    ///
-    /// Returns (self, false) if key doesn't exist (no-op).
-    pub fn update(mut self, key: usize, value: T) -> (Self, bool) {
-        let (seg_idx, local_key) = Self::decode_key(key);
-
-        let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
-            return (self, false);
-        };
-
-        if !slab_arc.contains(local_key) {
-            return (self, false);
-        }
-
-        let slab = Arc::make_mut(slab_arc);
-        slab[local_key] = value;
-        (self, true)
-    }
-
-    /// Remove entry at key, returning (new_slab, removed_value).
-    ///
-    /// Freed slot will be reused by future inserts.
-    pub fn remove(mut self, key: usize) -> (Self, Option<T>) {
-        let (seg_idx, local_key) = Self::decode_key(key);
-
-        let Some(slab_arc) = self.segments.get_mut(seg_idx) else {
-            return (self, None);
-        };
-
-        if !slab_arc.contains(local_key) {
-            return (self, None);
-        }
-
-        let slab = Arc::make_mut(slab_arc);
-        let value = slab.remove(local_key);
-        self.len -= 1;
-
-        (self, Some(value))
-    }
-
-    /// Remove entry at key, returning new slab. Ignores removed value.
-    #[inline]
-    pub fn without(self, key: usize) -> Self {
-        self.remove(key).0
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    const TEST_CAPACITY: usize = 1024;
-    type TestSlab<T> = SegmentedSlab<T, TEST_CAPACITY>;
-
-    #[test]
-    fn test_key_encoding() {
-        assert_eq!(TestSlab::<()>::decode_key(0), (0, 0));
-        assert_eq!(TestSlab::<()>::decode_key(42), (0, 42));
-        assert_eq!(TestSlab::<()>::decode_key(1024), (1, 0));
-        assert_eq!(TestSlab::<()>::decode_key(1025), (1, 1));
-        assert_eq!(TestSlab::<()>::decode_key(2048), (2, 0));
-
-        assert_eq!(TestSlab::<()>::encode_key(0, 0), 0);
-        assert_eq!(TestSlab::<()>::encode_key(0, 42), 42);
-        assert_eq!(TestSlab::<()>::encode_key(1, 0), 1024);
-        assert_eq!(TestSlab::<()>::encode_key(1, 1), 1025);
-    }
-
-    #[test]
-    fn test_insert_assigns_sequential_keys() {
-        let slab = TestSlab::new();
-
-        let (slab, key0) = slab.insert("a");
-        let (slab, key1) = slab.insert("b");
-        let (slab, key2) = slab.insert("c");
-
-        assert_eq!(key0, 0);
-        assert_eq!(key1, 1);
-        assert_eq!(key2, 2);
-        assert_eq!(slab.len(), 3);
-    }
-
-    #[test]
-    fn test_get() {
-        let slab = TestSlab::new();
-        let (slab, key) = slab.insert("hello");
-
-        assert_eq!(slab.get(key), Some(&"hello"));
-        assert_eq!(slab.get(999), None);
-    }
-
-    #[test]
-    fn test_update() {
-        let slab = TestSlab::new();
-        let (slab, key) = slab.insert("original");
-
-        let (slab, success) = slab.update(key, "updated");
-        assert!(success);
-        assert_eq!(slab.get(key), Some(&"updated"));
-        assert_eq!(slab.len(), 1);
-
-        // Update non-existent key returns false
-        let (_, success) = slab.update(999, "nope");
-        assert!(!success);
-    }
-
-    #[test]
-    fn test_remove_and_reuse() {
-        let slab = TestSlab::new();
-        let (slab, key0) = slab.insert("a");
-        let (slab, key1) = slab.insert("b");
-        let (slab, key2) = slab.insert("c");
-
-        assert_eq!(key0, 0);
-        assert_eq!(key1, 1);
-        assert_eq!(key2, 2);
-
-        // Remove middle entry
-        let (slab, removed) = slab.remove(key1);
-        assert_eq!(removed, Some("b"));
-        assert_eq!(slab.len(), 2);
-        assert_eq!(slab.get(key1), None);
-
-        // New insert reuses freed slot
-        let (slab, key3) = slab.insert("d");
-        assert_eq!(key3, 1); // Reused!
-        assert_eq!(slab.get(key3), Some(&"d"));
-        assert_eq!(slab.len(), 3);
-    }
-
-    #[test]
-    fn test_structural_sharing() {
-        let slab = TestSlab::new();
-        let (slab, _) = slab.insert("value");
-
-        // Clone shares segment via Arc
-        let snapshot = slab.clone();
-
-        // Modify original
-        let (slab, key1) = slab.insert("new");
-
-        // Snapshot still sees original state
-        assert_eq!(snapshot.len(), 1);
-        assert_eq!(snapshot.get(key1), None);
-
-        // Modified slab has both
-        assert_eq!(slab.len(), 2);
-    }
-
-    #[test]
-    fn test_arc_make_mut_no_clone_when_unique() {
-        let slab = TestSlab::new();
-        let (slab, _) = slab.insert("a");
-
-        let ptr_before = Arc::as_ptr(&slab.segments[0]);
-        let (slab, _) = slab.insert("b");
-        let ptr_after = Arc::as_ptr(&slab.segments[0]);
-
-        // Same pointer - no clone occurred
-        assert_eq!(ptr_before, ptr_after);
-    }
-
-    #[test]
-    fn test_arc_make_mut_clones_when_shared() {
-        let slab = TestSlab::new();
-        let (slab, _) = slab.insert("a");
-
-        let _snapshot = slab.clone(); // Creates shared reference
-        let ptr_before = Arc::as_ptr(&slab.segments[0]);
-
-        let (slab, _) = slab.insert("b");
-        let ptr_after = Arc::as_ptr(&slab.segments[0]);
-
-        // Different pointer - clone occurred
-        assert_ne!(ptr_before, ptr_after);
-    }
-
-    #[test]
-    fn test_cross_segment() {
-        let mut slab = TestSlab::new();
-
-        // Fill first segment
-        for i in 0..TEST_CAPACITY {
-            let (new_slab, key) = slab.insert(i);
-            slab = new_slab;
-            assert_eq!(key, i);
-        }
-
-        // Next insert should go to second segment
-        let (slab, key) = slab.insert(TEST_CAPACITY);
-        assert_eq!(key, TEST_CAPACITY);
-        assert_eq!(slab.segments.len(), 2);
-    }
-
-    #[test]
-    fn test_from_entries() {
-        let entries = vec![(0, "zero"), (5, "five"), (1024, "segment2")];
-
-        let slab = TestSlab::from_entries(entries);
-
-        assert_eq!(slab.len(), 3);
-        assert_eq!(slab.get(0), Some(&"zero"));
-        assert_eq!(slab.get(5), Some(&"five"));
-        assert_eq!(slab.get(1024), Some(&"segment2"));
-        assert_eq!(slab.segments.len(), 2);
-    }
-
-    #[test]
-    fn test_iter() {
-        let slab = TestSlab::new();
-        let (slab, _) = slab.insert("a");
-        let (slab, _) = slab.insert("b");
-        let (slab, _) = slab.insert("c");
-
-        let items: Vec<_> = slab.iter().collect();
-        assert_eq!(items, vec![(0, &"a"), (1, &"b"), (2, &"c")]);
-    }
-
-    #[test]
-    fn test_keys_and_values() {
-        let slab = TestSlab::new();
-        let (slab, _) = slab.insert(10);
-        let (slab, _) = slab.insert(20);
-
-        let keys: Vec<_> = slab.keys().collect();
-        assert_eq!(keys, vec![0, 1]);
-
-        let values: Vec<_> = slab.values().collect();
-        assert_eq!(values, vec![&10, &20]);
-    }
-
-    #[test]
-    fn test_contains_key() {
-        let slab = TestSlab::new();
-        let (slab, key) = slab.insert("x");
-
-        assert!(slab.contains_key(key));
-        assert!(!slab.contains_key(999));
-        assert!(!slab.contains_key(2048)); // Non-existent segment
-    }
-
-    #[test]
-    fn test_without() {
-        let slab = TestSlab::new();
-        let (slab, key) = slab.insert("value");
-
-        // without() removes and ignores result
-        let slab = slab.without(key);
-        assert_eq!(slab.get(key), None);
-        assert_eq!(slab.len(), 0);
-
-        // without() on non-existent key is no-op
-        let slab = slab.without(999);
-        assert_eq!(slab.len(), 0);
-    }
-
-    #[test]
-    fn test_empty_slab() {
-        let slab: TestSlab<i32> = TestSlab::new();
-
-        assert!(slab.is_empty());
-        assert_eq!(slab.len(), 0);
-        assert_eq!(slab.get(0), None);
-        assert!(!slab.contains_key(0));
-        assert_eq!(slab.iter().count(), 0);
-    }
-
-    #[test]
-    #[should_panic(expected = "duplicate key 0 in from_entries")]
-    fn test_from_entries_panics_on_duplicate_keys() {
-        let entries = vec![(0, "first"), (0, "second")];
-        let _ = TestSlab::from_entries(entries);
-    }
-}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 9eaf65531..9171c4097 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -17,7 +17,6 @@
 
 mod alloc;
 mod certificates;
-pub mod collections;
 mod commands;
 mod configs;
 mod error;
diff --git a/core/common/src/types/personal_access_tokens/mod.rs 
b/core/common/src/types/personal_access_tokens/mod.rs
index e28a585de..0e1e33391 100644
--- a/core/common/src/types/personal_access_tokens/mod.rs
+++ b/core/common/src/types/personal_access_tokens/mod.rs
@@ -28,8 +28,8 @@ const SIZE: usize = 50;
 #[derive(Clone, Debug)]
 pub struct PersonalAccessToken {
     pub user_id: UserId,
-    pub name: Arc<String>,
-    pub token: Arc<String>,
+    pub name: Arc<str>,
+    pub token: Arc<str>,
     pub expiry_at: Option<IggyTimestamp>,
 }
 
@@ -49,8 +49,8 @@ impl PersonalAccessToken {
         (
             Self {
                 user_id,
-                name: Arc::new(name.to_string()),
-                token: Arc::new(token_hash),
+                name: Arc::from(name),
+                token: Arc::from(token_hash),
                 expiry_at: Self::calculate_expiry_at(now, expiry),
             },
             token,
@@ -65,8 +65,8 @@ impl PersonalAccessToken {
     ) -> Self {
         Self {
             user_id,
-            name: Arc::new(name.into()),
-            token: Arc::new(token_hash.into()),
+            name: Arc::from(name),
+            token: Arc::from(token_hash),
             expiry_at,
         }
     }
@@ -106,12 +106,12 @@ mod tests {
         let name = "test_token";
         let (personal_access_token, raw_token) =
             PersonalAccessToken::new(user_id, name, now, 
IggyExpiry::NeverExpire);
-        assert_eq!(personal_access_token.name.as_str(), name);
+        assert_eq!(&*personal_access_token.name, name);
         assert!(!personal_access_token.token.is_empty());
         assert!(!raw_token.is_empty());
-        assert_ne!(personal_access_token.token.as_str(), raw_token);
+        assert_ne!(&*personal_access_token.token, raw_token);
         assert_eq!(
-            personal_access_token.token.as_str(),
+            &*personal_access_token.token,
             PersonalAccessToken::hash_token(&raw_token)
         );
     }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index ff38a114b..3e3bb5dc4 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -69,9 +69,9 @@ flume = { workspace = true }
 futures = { workspace = true }
 hash32 = "1.0.0"
 human-repr = { workspace = true }
-
 iggy_common = { workspace = true }
 jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
+left-right = "0.11"
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
 mime_guess = { version = "2.0", optional = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 216e7c93a..13690a813 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -212,7 +212,7 @@ pub fn load_users(state: impl IntoIterator<Item = 
UserState>) -> Users {
             .into_values()
             .map(|token| {
                 (
-                    Arc::new(token.token_hash.clone()),
+                    Arc::from(token.token_hash.as_str()),
                     PersonalAccessToken::raw(id, &token.name, 
&token.token_hash, token.expiry_at),
                 )
             })
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index afe243529..d7ec542f2 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -138,7 +138,7 @@ pub fn map_personal_access_tokens(
     let mut personal_access_tokens_data = 
Vec::with_capacity(personal_access_tokens.len());
     for personal_access_token in personal_access_tokens {
         let personal_access_token = PersonalAccessTokenInfo {
-            name: personal_access_token.name.as_str().to_owned(),
+            name: (*personal_access_token.name).to_owned(),
             expiry_at: personal_access_token.expiry_at,
         };
         personal_access_tokens_data.push(personal_access_token);
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 2919214b6..41669486f 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -37,6 +37,7 @@ pub mod diagnostics;
 pub mod http;
 pub mod io;
 pub mod log;
+pub mod metadata;
 pub mod quic;
 pub mod server_error;
 pub mod shard;
diff --git a/core/server/src/metadata/absorb.rs 
b/core/server/src/metadata/absorb.rs
new file mode 100644
index 000000000..d36186a64
--- /dev/null
+++ b/core/server/src/metadata/absorb.rs
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::ConsumerGroupMemberMeta;
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::ops::MetadataOp;
+use left_right::Absorb;
+use std::sync::atomic::Ordering;
+
+impl Absorb<MetadataOp> for InnerMetadata {
+    fn absorb_first(&mut self, op: &mut MetadataOp, _other: &Self) {
+        apply_op(self, op, true);
+    }
+
+    fn absorb_second(&mut self, op: MetadataOp, _other: &Self) {
+        apply_op(self, &op, false);
+    }
+
+    fn sync_with(&mut self, first: &Self) {
+        *self = first.clone();
+    }
+}
+
+fn apply_op(metadata: &mut InnerMetadata, op: &MetadataOp, populate_ids: bool) 
{
+    match op {
+        MetadataOp::Initialize(initial) => {
+            *metadata = (**initial).clone();
+        }
+
+        MetadataOp::AddStream { meta, assigned_id } => {
+            let entry = metadata.streams.vacant_entry();
+            let id = entry.key();
+            if populate_ids {
+                assigned_id.store(id, Ordering::Release);
+            }
+            let mut meta = meta.clone();
+            meta.id = id;
+            let name = meta.name.clone();
+            entry.insert(meta);
+            metadata.stream_index.insert(name, id);
+        }
+
+        MetadataOp::UpdateStream { id, new_name } => {
+            if let Some(stream) = metadata.streams.get_mut(*id) {
+                let old_name = stream.name.clone();
+                stream.name = new_name.clone();
+                metadata.stream_index.remove(&old_name);
+                metadata.stream_index.insert(new_name.clone(), *id);
+            }
+        }
+
+        MetadataOp::DeleteStream { id } => {
+            if metadata.streams.contains(*id) {
+                let stream = metadata.streams.remove(*id);
+                metadata.stream_index.remove(&stream.name);
+            }
+        }
+
+        MetadataOp::AddTopic {
+            stream_id,
+            meta,
+            assigned_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id) {
+                let entry = stream.topics.vacant_entry();
+                let id = entry.key();
+                if populate_ids {
+                    assigned_id.store(id, Ordering::Release);
+                }
+                let mut meta = meta.clone();
+                meta.id = id;
+                let name = meta.name.clone();
+                entry.insert(meta);
+                stream.topic_index.insert(name, id);
+            }
+        }
+
+        MetadataOp::UpdateTopic {
+            stream_id,
+            topic_id,
+            new_name,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                let old_name = topic.name.clone();
+
+                topic.name = new_name.clone();
+                topic.message_expiry = *message_expiry;
+                topic.compression_algorithm = *compression_algorithm;
+                topic.max_topic_size = *max_topic_size;
+                topic.replication_factor = *replication_factor;
+
+                if old_name != *new_name {
+                    stream.topic_index.remove(&old_name);
+                    stream.topic_index.insert(new_name.clone(), *topic_id);
+                }
+            }
+        }
+
+        MetadataOp::DeleteTopic {
+            stream_id,
+            topic_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && stream.topics.contains(*topic_id)
+            {
+                let topic = stream.topics.remove(*topic_id);
+                stream.topic_index.remove(&topic.name);
+            }
+        }
+
+        MetadataOp::AddPartitions {
+            stream_id,
+            topic_id,
+            partitions,
+            revision_id,
+        } => {
+            if partitions.is_empty() {
+                return;
+            }
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                for meta in partitions {
+                    let mut meta = meta.clone();
+                    meta.id = topic.partitions.len();
+                    meta.revision_id = *revision_id;
+                    topic.partitions.push(meta);
+                }
+            }
+        }
+
+        MetadataOp::DeletePartitions {
+            stream_id,
+            topic_id,
+            count,
+        } => {
+            if *count == 0 {
+                return;
+            }
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                let new_len = topic.partitions.len().saturating_sub(*count as 
usize);
+                topic.partitions.truncate(new_len);
+            }
+        }
+
+        MetadataOp::SetPartitionOffsets {
+            stream_id,
+            topic_id,
+            partition_id,
+            consumer_offsets,
+            consumer_group_offsets,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+                && let Some(partition) = 
topic.partitions.get_mut(*partition_id)
+            {
+                partition.consumer_offsets = Some(consumer_offsets.clone());
+                partition.consumer_group_offsets = 
Some(consumer_group_offsets.clone());
+            }
+        }
+
+        MetadataOp::AddUser { meta, assigned_id } => {
+            let entry = metadata.users.vacant_entry();
+            let id = entry.key();
+            if populate_ids {
+                assigned_id.store(id, Ordering::Release);
+            }
+            let mut meta = meta.clone();
+            meta.id = id as u32;
+            let username = meta.username.clone();
+            entry.insert(meta);
+            metadata.user_index.insert(username, id as u32);
+        }
+
+        MetadataOp::UpdateUserMeta { id, meta } => {
+            let user_id = *id as usize;
+            if let Some(old_user) = metadata.users.get(user_id)
+                && old_user.username != meta.username
+            {
+                metadata.user_index.remove(&old_user.username);
+                metadata.user_index.insert(meta.username.clone(), *id);
+            }
+            if metadata.users.contains(user_id) {
+                metadata.users[user_id] = meta.clone();
+            }
+        }
+
+        MetadataOp::DeleteUser { id } => {
+            let user_id = *id as usize;
+            if metadata.users.contains(user_id) {
+                let user = metadata.users.remove(user_id);
+                metadata.user_index.remove(&user.username);
+            }
+            metadata.personal_access_tokens.remove(id);
+        }
+
+        MetadataOp::AddPersonalAccessToken { user_id, pat } => {
+            metadata
+                .personal_access_tokens
+                .entry(*user_id)
+                .or_default()
+                .insert(pat.token.clone(), pat.clone());
+        }
+
+        MetadataOp::DeletePersonalAccessToken {
+            user_id,
+            token_hash,
+        } => {
+            if let Some(user_pats) = 
metadata.personal_access_tokens.get_mut(user_id) {
+                user_pats.remove(token_hash);
+            }
+        }
+
+        MetadataOp::AddConsumerGroup {
+            stream_id,
+            topic_id,
+            meta,
+            assigned_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                let entry = topic.consumer_groups.vacant_entry();
+                let id = entry.key();
+                if populate_ids {
+                    assigned_id.store(id, Ordering::Release);
+                }
+                let mut meta = meta.clone();
+                meta.id = id;
+                let name = meta.name.clone();
+                entry.insert(meta);
+                topic.consumer_group_index.insert(name, id);
+            }
+        }
+
+        MetadataOp::DeleteConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+                && topic.consumer_groups.contains(*group_id)
+            {
+                let group = topic.consumer_groups.remove(*group_id);
+                topic.consumer_group_index.remove(&group.name);
+            }
+        }
+
+        MetadataOp::JoinConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+            client_id,
+            member_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+                && let Some(group) = topic.consumer_groups.get_mut(*group_id)
+            {
+                let next_id = group
+                    .members
+                    .iter()
+                    .map(|(_, m)| m.id)
+                    .max()
+                    .map(|m| m + 1)
+                    .unwrap_or(0);
+
+                if populate_ids {
+                    member_id.store(next_id, Ordering::Release);
+                }
+
+                let new_member = ConsumerGroupMemberMeta::new(next_id, 
*client_id);
+                group.members.insert(new_member);
+                group.rebalance_members();
+            }
+        }
+
+        MetadataOp::LeaveConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+            client_id,
+            removed_member_id,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+                && let Some(group) = topic.consumer_groups.get_mut(*group_id)
+            {
+                let member_to_remove: Option<usize> = group
+                    .members
+                    .iter()
+                    .find(|(_, m)| m.client_id == *client_id)
+                    .map(|(id, _)| id);
+
+                if let Some(mid) = member_to_remove {
+                    if populate_ids {
+                        removed_member_id.store(mid, Ordering::Release);
+                    }
+                    group.members.remove(mid);
+                    group.rebalance_members();
+                }
+            }
+        }
+
+        MetadataOp::RebalanceConsumerGroupsForTopic {
+            stream_id,
+            topic_id,
+            partitions_count,
+        } => {
+            if let Some(stream) = metadata.streams.get_mut(*stream_id)
+                && let Some(topic) = stream.topics.get_mut(*topic_id)
+            {
+                let partition_ids: Vec<usize> = (0..*partitions_count as 
usize).collect();
+                let group_ids: Vec<_> = topic.consumer_groups.iter().map(|(id, 
_)| id).collect();
+
+                for gid in group_ids {
+                    if let Some(group) = topic.consumer_groups.get_mut(gid) {
+                        group.partitions = partition_ids.clone();
+                        group.rebalance_members();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/core/server/src/metadata/consumer_group.rs 
b/core/server/src/metadata/consumer_group.rs
new file mode 100644
index 000000000..d44dd5ecb
--- /dev/null
+++ b/core/server/src/metadata/consumer_group.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::consumer_group_member::ConsumerGroupMemberMeta;
+use crate::metadata::{ConsumerGroupId, PartitionId};
+use slab::Slab;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMeta {
+    pub id: ConsumerGroupId,
+    pub name: Arc<str>,
+    pub partitions: Vec<PartitionId>,
+    pub members: Slab<ConsumerGroupMemberMeta>,
+}
+
+impl ConsumerGroupMeta {
+    /// Rebalance partition assignments among members (round-robin).
+    pub fn rebalance_members(&mut self) {
+        let partition_count = self.partitions.len();
+        let member_count = self.members.len();
+
+        if member_count == 0 || partition_count == 0 {
+            return;
+        }
+
+        // Clear all member partitions
+        let member_ids: Vec<usize> = self.members.iter().map(|(id, _)| 
id).collect();
+        for &member_id in &member_ids {
+            if let Some(member) = self.members.get_mut(member_id) {
+                member.partitions.clear();
+            }
+        }
+
+        // Rebuild assignments (round-robin)
+        for (i, &partition_id) in self.partitions.iter().enumerate() {
+            let member_idx = i % member_count;
+            if let Some(&member_id) = member_ids.get(member_idx)
+                && let Some(member) = self.members.get_mut(member_id)
+            {
+                member.partitions.push(partition_id);
+            }
+        }
+    }
+}
diff --git a/core/common/src/collections/mod.rs 
b/core/server/src/metadata/consumer_group_member.rs
similarity index 56%
copy from core/common/src/collections/mod.rs
copy to core/server/src/metadata/consumer_group_member.rs
index ce21f8584..8543351cb 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/consumer_group_member.rs
@@ -15,6 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod segmented_slab;
+use crate::metadata::{ClientId, ConsumerGroupMemberId, PartitionId};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
 
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMemberMeta {
+    pub id: ConsumerGroupMemberId,
+    pub client_id: ClientId,
+    pub partitions: Vec<PartitionId>,
+    pub partition_index: Arc<AtomicUsize>,
+}
+
+impl ConsumerGroupMemberMeta {
+    pub fn new(id: ConsumerGroupMemberId, client_id: ClientId) -> Self {
+        Self {
+            id,
+            client_id,
+            partitions: Vec::new(),
+            partition_index: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+}
diff --git a/core/common/src/collections/mod.rs 
b/core/server/src/metadata/inner.rs
similarity index 52%
copy from core/common/src/collections/mod.rs
copy to core/server/src/metadata/inner.rs
index ce21f8584..f826d3d8e 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/inner.rs
@@ -15,6 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod segmented_slab;
+use crate::metadata::{StreamId, StreamMeta, UserId, UserMeta};
+use ahash::AHashMap;
+use iggy_common::PersonalAccessToken;
+use slab::Slab;
+use std::sync::Arc;
 
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Default)]
+pub struct InnerMetadata {
+    /// Streams indexed by StreamId (slab-assigned)
+    pub streams: Slab<StreamMeta>,
+
+    /// Users indexed by UserId (slab-assigned)
+    pub users: Slab<UserMeta>,
+
+    /// Forward indexes (name → ID)
+    pub stream_index: AHashMap<Arc<str>, StreamId>,
+    pub user_index: AHashMap<Arc<str>, UserId>,
+
+    /// user_id -> (token_hash -> PAT)
+    pub personal_access_tokens: AHashMap<UserId, AHashMap<Arc<str>, 
PersonalAccessToken>>,
+}
+
+impl InnerMetadata {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs
new file mode 100644
index 000000000..12804095b
--- /dev/null
+++ b/core/server/src/metadata/mod.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared metadata module providing a single source of truth for all shards.
+//!
+//! This module provides an `ArcSwap`-based approach where all shards read from
+//! a shared snapshot, and only shard 0 can write (swap in new snapshots).
+//!
+//! # Architecture
+//!
+//! - `InnerMetadata` (inner.rs): Immutable snapshot with all metadata
+//! - `Metadata` (reader.rs): Thread-safe read handle for querying metadata
+//! - Entity types: `StreamMeta`, `TopicMeta`, `PartitionMeta`, `UserMeta`, 
`ConsumerGroupMeta`
+//! - Consumer offsets are stored in `PartitionMeta` for cross-shard visibility
+
+mod absorb;
+mod consumer_group;
+mod consumer_group_member;
+mod inner;
+pub mod ops;
+mod partition;
+mod reader;
+mod stream;
+mod topic;
+mod user;
+mod writer;
+
+pub use consumer_group::ConsumerGroupMeta;
+pub use consumer_group_member::ConsumerGroupMemberMeta;
+pub use inner::InnerMetadata;
+pub use ops::MetadataOp;
+pub use partition::PartitionMeta;
+pub use reader::Metadata;
+pub use stream::StreamMeta;
+pub use topic::TopicMeta;
+pub use user::UserMeta;
+pub use writer::MetadataWriter;
+
+pub type MetadataReadHandle = left_right::ReadHandle<InnerMetadata>;
+pub type StreamId = usize;
+pub type TopicId = usize;
+pub type PartitionId = usize;
+pub type UserId = u32;
+pub type ClientId = u32;
+pub type ConsumerGroupId = usize;
+pub type ConsumerGroupMemberId = usize;
+pub type ConsumerGroupKey = (StreamId, TopicId, ConsumerGroupId);
+
+pub fn create_metadata_handles() -> (MetadataWriter, MetadataReadHandle) {
+    let (write_handle, read_handle) = left_right::new::<InnerMetadata, 
MetadataOp>();
+    let mut writer = MetadataWriter::new(write_handle);
+    writer.publish();
+    (writer, read_handle)
+}
diff --git a/core/server/src/metadata/ops.rs b/core/server/src/metadata/ops.rs
new file mode 100644
index 000000000..4d21c0d90
--- /dev/null
+++ b/core/server/src/metadata/ops.rs
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::{
+    ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId, 
StreamMeta, TopicId,
+    TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets};
+use iggy_common::{CompressionAlgorithm, IggyExpiry, MaxTopicSize, 
PersonalAccessToken};
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+#[derive(Clone)]
+pub enum MetadataOp {
+    Initialize(Box<InnerMetadata>),
+
+    AddStream {
+        meta: StreamMeta,
+        assigned_id: Arc<AtomicUsize>,
+    },
+    UpdateStream {
+        id: StreamId,
+        new_name: Arc<str>,
+    },
+    DeleteStream {
+        id: StreamId,
+    },
+    AddTopic {
+        stream_id: StreamId,
+        meta: TopicMeta,
+        assigned_id: Arc<AtomicUsize>,
+    },
+    UpdateTopic {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        new_name: Arc<str>,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    },
+    DeleteTopic {
+        stream_id: StreamId,
+        topic_id: TopicId,
+    },
+    AddPartitions {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions: Vec<PartitionMeta>,
+        revision_id: u64,
+    },
+    DeletePartitions {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        count: u32,
+    },
+    SetPartitionOffsets {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+        consumer_offsets: Arc<ConsumerOffsets>,
+        consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+    },
+    AddUser {
+        meta: UserMeta,
+        assigned_id: Arc<AtomicUsize>,
+    },
+    UpdateUserMeta {
+        id: UserId,
+        meta: UserMeta,
+    },
+    DeleteUser {
+        id: UserId,
+    },
+
+    AddPersonalAccessToken {
+        user_id: UserId,
+        pat: PersonalAccessToken,
+    },
+    DeletePersonalAccessToken {
+        user_id: UserId,
+        token_hash: Arc<str>,
+    },
+    AddConsumerGroup {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        meta: ConsumerGroupMeta,
+        assigned_id: Arc<AtomicUsize>,
+    },
+    DeleteConsumerGroup {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    },
+    JoinConsumerGroup {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+        member_id: Arc<AtomicUsize>,
+    },
+    LeaveConsumerGroup {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+        removed_member_id: Arc<AtomicUsize>,
+    },
+    RebalanceConsumerGroupsForTopic {
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions_count: u32,
+    },
+}
diff --git a/core/server/src/metadata/partition.rs 
b/core/server/src/metadata/partition.rs
new file mode 100644
index 000000000..b2490b96a
--- /dev/null
+++ b/core/server/src/metadata/partition.rs
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::PartitionId;
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets};
+use crate::streaming::stats::PartitionStats;
+use iggy_common::IggyTimestamp;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct PartitionMeta {
+    pub id: PartitionId,
+    pub created_at: IggyTimestamp,
+    /// Monotonically increasing version to detect stale partition_store 
entries.
+    /// Set to the Metadata version when the partition was created.
+    pub revision_id: u64,
+    pub stats: Arc<PartitionStats>,
+    /// Consumer offsets for this partition // TODO: move to partition store
+    pub consumer_offsets: Option<Arc<ConsumerOffsets>>,
+    /// Consumer group offsets for this partition // TODO: move to partition 
store
+    pub consumer_group_offsets: Option<Arc<ConsumerGroupOffsets>>,
+}
diff --git a/core/server/src/metadata/reader.rs 
b/core/server/src/metadata/reader.rs
new file mode 100644
index 000000000..ea85511a6
--- /dev/null
+++ b/core/server/src/metadata/reader.rs
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{
+    ConsumerGroupId, ConsumerGroupMeta, InnerMetadata, MetadataReadHandle, 
PartitionId,
+    PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IdKind, Identifier, PersonalAccessToken};
+use left_right::ReadGuard;
+use std::sync::Arc;
+use std::sync::atomic::Ordering;
+
+/// Thread-safe wrapper for GlobalMetadata using left-right for lock-free 
reads.
+/// Uses hierarchical structure: streams contain topics, topics contain 
partitions and consumer groups.
+/// All mutations go through MetadataWriter (shard 0 only).
+///
+/// Each shard should own its own `Metadata` instance (cloned from a common 
source).
+/// The underlying data is shared via left-right's internal mechanism.
+#[derive(Clone)]
+pub struct Metadata {
+    inner: MetadataReadHandle,
+}
+
+impl Metadata {
+    pub fn new(reader: MetadataReadHandle) -> Self {
+        Self { inner: reader }
+    }
+
+    #[inline]
+    pub fn load(&self) -> ReadGuard<'_, InnerMetadata> {
+        self.inner
+            .enter()
+            .expect("metadata not initialized - writer must publish before 
reads")
+    }
+
+    pub fn get_stream_id(&self, identifier: &Identifier) -> Option<StreamId> {
+        let metadata = self.load();
+        match identifier.kind {
+            IdKind::Numeric => {
+                let stream_id = identifier.get_u32_value().ok()? as StreamId;
+                if metadata.streams.get(stream_id).is_some() {
+                    Some(stream_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                metadata.stream_index.get(name.as_ref()).copied()
+            }
+        }
+    }
+
+    pub fn stream_name_exists(&self, name: &str) -> bool {
+        self.load().stream_index.contains_key(name)
+    }
+
+    pub fn get_topic_id(&self, stream_id: StreamId, identifier: &Identifier) 
-> Option<TopicId> {
+        let metadata = self.load();
+        let stream = metadata.streams.get(stream_id)?;
+
+        match identifier.kind {
+            IdKind::Numeric => {
+                let topic_id = identifier.get_u32_value().ok()? as TopicId;
+                if stream.topics.get(topic_id).is_some() {
+                    Some(topic_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                stream.topic_index.get(&Arc::from(name.as_ref())).copied()
+            }
+        }
+    }
+
+    pub fn get_user_id(&self, identifier: &Identifier) -> Option<UserId> {
+        let metadata = self.load();
+        match identifier.kind {
+            IdKind::Numeric => Some(identifier.get_u32_value().ok()? as 
UserId),
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                metadata.user_index.get(name.as_ref()).copied()
+            }
+        }
+    }
+
+    pub fn get_consumer_group_id(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        identifier: &Identifier,
+    ) -> Option<ConsumerGroupId> {
+        let metadata = self.load();
+        let stream = metadata.streams.get(stream_id)?;
+        let topic = stream.topics.get(topic_id)?;
+
+        match identifier.kind {
+            IdKind::Numeric => {
+                let group_id = identifier.get_u32_value().ok()? as 
ConsumerGroupId;
+                if topic.consumer_groups.get(group_id).is_some() {
+                    Some(group_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                topic
+                    .consumer_group_index
+                    .get(&Arc::from(name.as_ref()))
+                    .copied()
+            }
+        }
+    }
+
+    pub fn stream_exists(&self, id: StreamId) -> bool {
+        self.load().streams.get(id).is_some()
+    }
+
+    pub fn topic_exists(&self, stream_id: StreamId, topic_id: TopicId) -> bool 
{
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .is_some()
+    }
+
+    pub fn partition_exists(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .is_some()
+    }
+
+    pub fn user_exists(&self, id: UserId) -> bool {
+        self.load().users.get(id as usize).is_some()
+    }
+
+    pub fn consumer_group_exists(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .is_some()
+    }
+
+    pub fn consumer_group_exists_by_name(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        name: &str,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.consumer_group_index.contains_key(name))
+            .unwrap_or(false)
+    }
+
+    pub fn streams_count(&self) -> usize {
+        self.load().streams.len()
+    }
+
+    pub fn next_stream_id(&self) -> usize {
+        self.load().streams.vacant_key()
+    }
+
+    pub fn topics_count(&self, stream_id: StreamId) -> usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .map(|s| s.topics.len())
+            .unwrap_or(0)
+    }
+
+    pub fn next_topic_id(&self, stream_id: StreamId) -> Option<usize> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .map(|s| s.topics.vacant_key())
+    }
+
+    pub fn partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> 
usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.partitions.len())
+            .unwrap_or(0)
+    }
+
+    pub fn get_partitions_count(&self, stream_id: StreamId, topic_id: TopicId) 
-> Option<usize> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.partitions.len())
+    }
+
+    pub fn get_next_partition_id(&self, stream_id: StreamId, topic_id: 
TopicId) -> Option<usize> {
+        let metadata = self.load();
+        let topic = metadata.streams.get(stream_id)?.topics.get(topic_id)?;
+        let partitions_count = topic.partitions.len();
+
+        if partitions_count == 0 {
+            return None;
+        }
+
+        let counter = &topic.round_robin_counter;
+        let mut partition_id = counter.fetch_add(1, Ordering::AcqRel);
+        if partition_id >= partitions_count {
+            partition_id %= partitions_count;
+            counter.store(partition_id + 1, Ordering::Relaxed);
+        }
+        Some(partition_id)
+    }
+
+    pub fn get_next_member_partition_id(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_id: usize,
+        calculate: bool,
+    ) -> Option<PartitionId> {
+        let metadata = self.load();
+        let member = metadata
+            .streams
+            .get(stream_id)?
+            .topics
+            .get(topic_id)?
+            .consumer_groups
+            .get(group_id)?
+            .members
+            .get(member_id)?;
+
+        let assigned_partitions = &member.partitions;
+        if assigned_partitions.is_empty() {
+            return None;
+        }
+
+        let partitions_count = assigned_partitions.len();
+        let counter = &member.partition_index;
+
+        if calculate {
+            let current = counter
+                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
+                    Some((current + 1) % partitions_count)
+                })
+                .unwrap();
+            Some(assigned_partitions[current % partitions_count])
+        } else {
+            let current = counter.load(Ordering::Relaxed);
+            Some(assigned_partitions[current % partitions_count])
+        }
+    }
+
+    pub fn users_count(&self) -> usize {
+        self.load().users.len()
+    }
+
+    pub fn username_exists(&self, username: &str) -> bool {
+        self.load().user_index.contains_key(username)
+    }
+
+    pub fn consumer_groups_count(&self, stream_id: StreamId, topic_id: 
TopicId) -> usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.consumer_groups.len())
+            .unwrap_or(0)
+    }
+
+    pub fn get_stream_stats(&self, id: StreamId) -> Option<Arc<StreamStats>> {
+        self.load().streams.get(id).map(|s| s.stats.clone())
+    }
+
+    pub fn get_topic_stats(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+    ) -> Option<Arc<TopicStats>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.stats.clone())
+    }
+
+    pub fn get_partition_stats(&self, ns: &IggyNamespace) -> 
Option<Arc<PartitionStats>> {
+        self.load()
+            .streams
+            .get(ns.stream_id())
+            .and_then(|s| s.topics.get(ns.topic_id()))
+            .and_then(|t| t.partitions.get(ns.partition_id()))
+            .map(|p| p.stats.clone())
+    }
+
+    pub fn get_partition_stats_by_ids(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<PartitionStats>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .map(|p| p.stats.clone())
+    }
+
+    pub fn get_partition_consumer_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<ConsumerOffsets>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .and_then(|p| p.consumer_offsets.clone())
+    }
+
+    pub fn get_partition_consumer_group_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<ConsumerGroupOffsets>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .and_then(|p| p.consumer_group_offsets.clone())
+    }
+
+    pub fn get_user(&self, id: UserId) -> Option<UserMeta> {
+        self.load().users.get(id as usize).cloned()
+    }
+
+    pub fn get_all_users(&self) -> Vec<UserMeta> {
+        self.load().users.iter().map(|(_, u)| u.clone()).collect()
+    }
+
+    pub fn get_stream(&self, id: StreamId) -> Option<StreamMeta> {
+        self.load().streams.get(id).cloned()
+    }
+
+    pub fn get_topic(&self, stream_id: StreamId, topic_id: TopicId) -> 
Option<TopicMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id).cloned())
+    }
+
+    pub fn get_partition(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<PartitionMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id).cloned())
+    }
+
+    pub fn get_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) -> Option<ConsumerGroupMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id).cloned())
+    }
+
+    pub fn get_user_personal_access_tokens(&self, user_id: UserId) -> 
Vec<PersonalAccessToken> {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.values().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    pub fn get_personal_access_token_by_hash(
+        &self,
+        token_hash: &str,
+    ) -> Option<PersonalAccessToken> {
+        let token_hash_arc: Arc<str> = Arc::from(token_hash);
+        let metadata = self.load();
+        for (_, user_pats) in metadata.personal_access_tokens.iter() {
+            if let Some(pat) = user_pats.get(&token_hash_arc) {
+                return Some(pat.clone());
+            }
+        }
+        None
+    }
+
+    pub fn user_pat_count(&self, user_id: UserId) -> usize {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.len())
+            .unwrap_or(0)
+    }
+
+    pub fn user_has_pat_with_name(&self, user_id: UserId, name: &str) -> bool {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.values().any(|pat| &*pat.name == name))
+            .unwrap_or(false)
+    }
+
+    pub fn find_pat_token_hash_by_name(&self, user_id: UserId, name: &str) -> 
Option<Arc<str>> {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .and_then(|pats| {
+                pats.iter()
+                    .find(|(_, pat)| &*pat.name == name)
+                    .map(|(hash, _)| hash.clone())
+            })
+    }
+
+    pub fn is_consumer_group_member(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> bool {
+        let metadata = self.load();
+        metadata
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .map(|g| g.members.iter().any(|(_, m)| m.client_id == client_id))
+            .unwrap_or(false)
+    }
+}
diff --git a/core/server/src/metadata/stream.rs 
b/core/server/src/metadata/stream.rs
new file mode 100644
index 000000000..02aee459d
--- /dev/null
+++ b/core/server/src/metadata/stream.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::topic::TopicMeta;
+use crate::metadata::{StreamId, TopicId};
+use crate::streaming::stats::StreamStats;
+use ahash::AHashMap;
+use iggy_common::IggyTimestamp;
+use slab::Slab;
+use std::sync::Arc;
+
+/// Stream metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct StreamMeta {
+    pub id: StreamId,
+    pub name: Arc<str>,
+    pub created_at: IggyTimestamp,
+    pub stats: Arc<StreamStats>,
+    pub topics: Slab<TopicMeta>,
+    pub topic_index: AHashMap<Arc<str>, TopicId>,
+}
+
+impl StreamMeta {
+    pub fn new(id: StreamId, name: Arc<str>, created_at: IggyTimestamp) -> 
Self {
+        Self {
+            id,
+            name,
+            created_at,
+            stats: Arc::new(StreamStats::default()),
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
+        }
+    }
+
+    pub fn with_stats(
+        id: StreamId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        stats: Arc<StreamStats>,
+    ) -> Self {
+        Self {
+            id,
+            name,
+            created_at,
+            stats,
+            topics: Slab::new(),
+            topic_index: AHashMap::default(),
+        }
+    }
+}
diff --git a/core/server/src/metadata/topic.rs 
b/core/server/src/metadata/topic.rs
new file mode 100644
index 000000000..ba0a88b3d
--- /dev/null
+++ b/core/server/src/metadata/topic.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::consumer_group::ConsumerGroupMeta;
+use crate::metadata::partition::PartitionMeta;
+use crate::metadata::{ConsumerGroupId, TopicId};
+use crate::streaming::stats::TopicStats;
+use ahash::AHashMap;
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+use slab::Slab;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+/// Topic metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct TopicMeta {
+    pub id: TopicId,
+    pub name: Arc<str>,
+    pub created_at: IggyTimestamp,
+    pub message_expiry: IggyExpiry,
+    pub compression_algorithm: CompressionAlgorithm,
+    pub max_topic_size: MaxTopicSize,
+    pub replication_factor: u8,
+    pub stats: Arc<TopicStats>,
+    pub partitions: Vec<PartitionMeta>,
+    pub consumer_groups: Slab<ConsumerGroupMeta>,
+    pub consumer_group_index: AHashMap<Arc<str>, ConsumerGroupId>,
+    pub round_robin_counter: Arc<AtomicUsize>,
+}
+
+impl TopicMeta {
+    #[allow(clippy::too_many_arguments)]
+    pub fn with_stats(
+        id: TopicId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+        stats: Arc<TopicStats>,
+    ) -> Self {
+        Self {
+            id,
+            name,
+            created_at,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+            stats,
+            partitions: Vec::new(),
+            consumer_groups: Slab::new(),
+            consumer_group_index: AHashMap::default(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+}
diff --git a/core/common/src/collections/mod.rs 
b/core/server/src/metadata/user.rs
similarity index 69%
rename from core/common/src/collections/mod.rs
rename to core/server/src/metadata/user.rs
index ce21f8584..c82d49a44 100644
--- a/core/common/src/collections/mod.rs
+++ b/core/server/src/metadata/user.rs
@@ -15,6 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod segmented_slab;
+use crate::metadata::UserId;
+use iggy_common::{IggyTimestamp, Permissions, UserStatus};
+use std::sync::Arc;
 
-pub use segmented_slab::SegmentedSlab;
+#[derive(Clone, Debug)]
+pub struct UserMeta {
+    pub id: UserId,
+    pub username: Arc<str>,
+    pub password_hash: Arc<str>,
+    pub status: UserStatus,
+    pub permissions: Option<Arc<Permissions>>,
+    pub created_at: IggyTimestamp,
+}
diff --git a/core/server/src/metadata/writer.rs 
b/core/server/src/metadata/writer.rs
new file mode 100644
index 000000000..daa1142bf
--- /dev/null
+++ b/core/server/src/metadata/writer.rs
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::inner::InnerMetadata;
+use crate::metadata::ops::MetadataOp;
+use crate::metadata::reader::Metadata;
+use crate::metadata::{
+    ConsumerGroupId, ConsumerGroupMeta, PartitionId, PartitionMeta, StreamId, 
StreamMeta, TopicId,
+    TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use iggy_common::{
+    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
+    Permissions, PersonalAccessToken, UserStatus,
+};
+use left_right::WriteHandle;
+use slab::Slab;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+pub struct MetadataWriter {
+    inner: WriteHandle<InnerMetadata, MetadataOp>,
+    revision: u64,
+}
+
+impl MetadataWriter {
+    pub fn new(handle: WriteHandle<InnerMetadata, MetadataOp>) -> Self {
+        Self {
+            inner: handle,
+            revision: 0,
+        }
+    }
+
+    fn next_revision(&mut self) -> u64 {
+        self.revision += 1;
+        self.revision
+    }
+
+    pub fn append(&mut self, op: MetadataOp) {
+        self.inner.append(op);
+    }
+
+    pub fn publish(&mut self) {
+        self.inner.publish();
+    }
+
+    pub fn initialize(&mut self, initial: InnerMetadata) {
+        self.append(MetadataOp::Initialize(Box::new(initial)));
+        self.publish();
+    }
+
+    pub fn add_stream(&mut self, meta: StreamMeta) -> StreamId {
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        self.append(MetadataOp::AddStream {
+            meta,
+            assigned_id: assigned_id.clone(),
+        });
+        self.publish();
+        assigned_id.load(Ordering::Acquire)
+    }
+
+    pub fn update_stream(&mut self, id: StreamId, new_name: Arc<str>) {
+        self.append(MetadataOp::UpdateStream { id, new_name });
+        self.publish();
+    }
+
+    pub fn delete_stream(&mut self, id: StreamId) {
+        self.append(MetadataOp::DeleteStream { id });
+        self.publish();
+    }
+
+    pub fn add_topic(&mut self, stream_id: StreamId, meta: TopicMeta) -> 
Option<TopicId> {
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+        self.append(MetadataOp::AddTopic {
+            stream_id,
+            meta,
+            assigned_id: assigned_id.clone(),
+        });
+        self.publish();
+        let id = assigned_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn update_topic(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        new_name: Arc<str>,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) {
+        self.append(MetadataOp::UpdateTopic {
+            stream_id,
+            topic_id,
+            new_name,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+        });
+        self.publish();
+    }
+
+    pub fn delete_topic(&mut self, stream_id: StreamId, topic_id: TopicId) {
+        self.append(MetadataOp::DeleteTopic {
+            stream_id,
+            topic_id,
+        });
+        self.publish();
+    }
+
+    /// Add partitions to a topic. Returns the assigned partition IDs 
(sequential from current count).
+    pub fn add_partitions(
+        &mut self,
+        reader: &Metadata,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions: Vec<PartitionMeta>,
+    ) -> Vec<PartitionId> {
+        if partitions.is_empty() {
+            return Vec::new();
+        }
+
+        let count_before = reader
+            .get_partitions_count(stream_id, topic_id)
+            .expect("stream and topic must exist when adding partitions");
+        let count = partitions.len();
+
+        let revision_id = self.next_revision();
+        self.append(MetadataOp::AddPartitions {
+            stream_id,
+            topic_id,
+            partitions,
+            revision_id,
+        });
+        self.publish();
+
+        (count_before..count_before + count).collect()
+    }
+
+    /// Delete partitions from the end of a topic.
+    pub fn delete_partitions(&mut self, stream_id: StreamId, topic_id: 
TopicId, count: u32) {
+        if count == 0 {
+            return;
+        }
+        self.append(MetadataOp::DeletePartitions {
+            stream_id,
+            topic_id,
+            count,
+        });
+        self.publish();
+    }
+
+    pub fn set_partition_offsets(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+        consumer_offsets: Arc<ConsumerOffsets>,
+        consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+    ) {
+        self.append(MetadataOp::SetPartitionOffsets {
+            stream_id,
+            topic_id,
+            partition_id,
+            consumer_offsets,
+            consumer_group_offsets,
+        });
+        self.publish();
+    }
+
+    pub fn add_user(&mut self, meta: UserMeta) -> UserId {
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        self.append(MetadataOp::AddUser {
+            meta,
+            assigned_id: assigned_id.clone(),
+        });
+        self.publish();
+        assigned_id.load(Ordering::Acquire) as UserId
+    }
+
+    pub fn update_user_meta(&mut self, id: UserId, meta: UserMeta) {
+        self.append(MetadataOp::UpdateUserMeta { id, meta });
+        self.publish();
+    }
+
+    pub fn delete_user(&mut self, id: UserId) {
+        self.append(MetadataOp::DeleteUser { id });
+        self.publish();
+    }
+
+    pub fn add_personal_access_token(&mut self, user_id: UserId, pat: 
PersonalAccessToken) {
+        self.append(MetadataOp::AddPersonalAccessToken { user_id, pat });
+        self.publish();
+    }
+
+    pub fn delete_personal_access_token(&mut self, user_id: UserId, 
token_hash: Arc<str>) {
+        self.append(MetadataOp::DeletePersonalAccessToken {
+            user_id,
+            token_hash,
+        });
+        self.publish();
+    }
+
+    pub fn add_consumer_group(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        meta: ConsumerGroupMeta,
+    ) -> Option<ConsumerGroupId> {
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+        self.append(MetadataOp::AddConsumerGroup {
+            stream_id,
+            topic_id,
+            meta,
+            assigned_id: assigned_id.clone(),
+        });
+        self.publish();
+        let id = assigned_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn delete_consumer_group(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) {
+        self.append(MetadataOp::DeleteConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+        });
+        self.publish();
+    }
+
+    pub fn join_consumer_group(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> Option<usize> {
+        let member_id = Arc::new(AtomicUsize::new(usize::MAX));
+        self.append(MetadataOp::JoinConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+            client_id,
+            member_id: member_id.clone(),
+        });
+        self.publish();
+        let id = member_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn leave_consumer_group(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> Option<usize> {
+        let removed_member_id = Arc::new(AtomicUsize::new(usize::MAX));
+        self.append(MetadataOp::LeaveConsumerGroup {
+            stream_id,
+            topic_id,
+            group_id,
+            client_id,
+            removed_member_id: removed_member_id.clone(),
+        });
+        self.publish();
+        let id = removed_member_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn rebalance_consumer_groups_for_topic(
+        &mut self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions_count: u32,
+    ) {
+        self.append(MetadataOp::RebalanceConsumerGroupsForTopic {
+            stream_id,
+            topic_id,
+            partitions_count,
+        });
+        self.publish();
+    }
+
+    // High-level registration methods with validation
+
+    pub fn create_stream(
+        &mut self,
+        reader: &Metadata,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+    ) -> Result<(StreamId, Arc<StreamStats>), IggyError> {
+        if reader.stream_name_exists(&name) {
+            return Err(IggyError::StreamNameAlreadyExists(name.to_string()));
+        }
+
+        let stats = Arc::new(StreamStats::default());
+        let meta = StreamMeta::with_stats(0, name, created_at, stats.clone());
+        let id = self.add_stream(meta);
+        Ok((id, stats))
+    }
+
+    pub fn try_update_stream(
+        &mut self,
+        reader: &Metadata,
+        id: StreamId,
+        new_name: Arc<str>,
+    ) -> Result<(), IggyError> {
+        let guard = reader.load();
+        let Some(stream) = guard.streams.get(id) else {
+            return Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(id as u32).unwrap(),
+            ));
+        };
+
+        if stream.name == new_name {
+            return Ok(());
+        }
+
+        if let Some(&existing_id) = guard.stream_index.get(&new_name)
+            && existing_id != id
+        {
+            return 
Err(IggyError::StreamNameAlreadyExists(new_name.to_string()));
+        }
+        drop(guard);
+
+        self.update_stream(id, new_name);
+        Ok(())
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn create_topic(
+        &mut self,
+        reader: &Metadata,
+        stream_id: StreamId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) -> Result<(TopicId, Arc<TopicStats>), IggyError> {
+        let parent_stats = reader.get_stream_stats(stream_id).ok_or_else(|| {
+            IggyError::StreamIdNotFound(Identifier::numeric(stream_id as 
u32).unwrap())
+        })?;
+
+        let guard = reader.load();
+        let Some(stream) = guard.streams.get(stream_id) else {
+            return Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        };
+
+        if stream.topic_index.contains_key(&name) {
+            return Err(IggyError::TopicNameAlreadyExists(
+                name.to_string(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        }
+        drop(guard);
+
+        let stats = Arc::new(TopicStats::new(parent_stats));
+        let meta = TopicMeta {
+            id: 0,
+            name,
+            created_at,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+            stats: stats.clone(),
+            partitions: Vec::new(),
+            consumer_groups: Slab::new(),
+            consumer_group_index: ahash::AHashMap::default(),
+            round_robin_counter: Arc::new(AtomicUsize::new(0)),
+        };
+
+        // change to create_topic
+        let id = self.add_topic(stream_id, meta).ok_or_else(|| {
+            IggyError::StreamIdNotFound(Identifier::numeric(stream_id as 
u32).unwrap())
+        })?;
+        Ok((id, stats))
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_update_topic(
+        &mut self,
+        reader: &Metadata,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        new_name: Arc<str>,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) -> Result<(), IggyError> {
+        let guard = reader.load();
+        let Some(stream) = guard.streams.get(stream_id) else {
+            return Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        };
+
+        let Some(topic) = stream.topics.get(topic_id) else {
+            return Err(IggyError::TopicIdNotFound(
+                Identifier::numeric(topic_id as u32).unwrap(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        };
+
+        if topic.name != new_name
+            && let Some(&existing_id) = stream.topic_index.get(&new_name)
+            && existing_id != topic_id
+        {
+            return Err(IggyError::TopicNameAlreadyExists(
+                new_name.to_string(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        }
+        drop(guard);
+
+        self.update_topic(
+            stream_id,
+            topic_id,
+            new_name,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            replication_factor,
+        );
+        Ok(())
+    }
+
+    pub fn register_partitions(
+        &mut self,
+        reader: &Metadata,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        count: usize,
+        created_at: IggyTimestamp,
+    ) -> Vec<(PartitionId, Arc<PartitionStats>)> {
+        if count == 0 {
+            return Vec::new();
+        }
+
+        let parent_stats = reader
+            .get_topic_stats(stream_id, topic_id)
+            .expect("Parent topic stats must exist before registering 
partitions");
+
+        let mut metas = Vec::with_capacity(count);
+        let mut stats_list = Vec::with_capacity(count);
+
+        for _ in 0..count {
+            let stats = Arc::new(PartitionStats::new(parent_stats.clone()));
+            metas.push(PartitionMeta {
+                id: 0,
+                created_at,
+                revision_id: 0,
+                stats: stats.clone(),
+                consumer_offsets: None,
+                consumer_group_offsets: None,
+            });
+            stats_list.push(stats);
+        }
+
+        let ids = self.add_partitions(reader, stream_id, topic_id, metas);
+        ids.into_iter().zip(stats_list).collect()
+    }
+
+    pub fn create_user(
+        &mut self,
+        reader: &Metadata,
+        username: Arc<str>,
+        password_hash: Arc<str>,
+        status: UserStatus,
+        permissions: Option<Arc<Permissions>>,
+        max_users: usize,
+    ) -> Result<UserId, IggyError> {
+        if reader.username_exists(&username) {
+            return Err(IggyError::UserAlreadyExists);
+        }
+
+        if reader.users_count() >= max_users {
+            return Err(IggyError::UsersLimitReached);
+        }
+
+        let meta = UserMeta {
+            id: 0,
+            username,
+            password_hash,
+            status,
+            permissions,
+            created_at: IggyTimestamp::now(),
+        };
+        let id = self.add_user(meta);
+        Ok(id)
+    }
+
+    pub fn update_user(
+        &mut self,
+        reader: &Metadata,
+        id: UserId,
+        username: Option<Arc<str>>,
+        status: Option<UserStatus>,
+    ) -> Result<UserMeta, IggyError> {
+        let Some(mut meta) = reader.get_user(id) else {
+            return Err(IggyError::ResourceNotFound(format!("user:{id}")));
+        };
+
+        if let Some(new_username) = username {
+            if meta.username != new_username && 
reader.username_exists(&new_username) {
+                return Err(IggyError::UserAlreadyExists);
+            }
+            meta.username = new_username;
+        }
+
+        if let Some(new_status) = status {
+            meta.status = new_status;
+        }
+
+        let updated = meta.clone();
+        self.update_user_meta(id, meta);
+        Ok(updated)
+    }
+
+    pub fn create_consumer_group(
+        &mut self,
+        reader: &Metadata,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        name: Arc<str>,
+        partitions_count: u32,
+    ) -> Result<ConsumerGroupId, IggyError> {
+        let guard = reader.load();
+        let Some(stream) = guard.streams.get(stream_id) else {
+            return Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        };
+
+        let Some(topic) = stream.topics.get(topic_id) else {
+            return Err(IggyError::TopicIdNotFound(
+                Identifier::numeric(topic_id as u32).unwrap(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ));
+        };
+
+        if topic.consumer_group_index.contains_key(&name) {
+            return Err(IggyError::ConsumerGroupNameAlreadyExists(
+                name.to_string(),
+                Identifier::numeric(topic_id as u32).unwrap(),
+            ));
+        }
+        drop(guard);
+
+        let meta = ConsumerGroupMeta {
+            id: 0,
+            name,
+            partitions: (0..partitions_count as usize).collect(),
+            members: Slab::new(),
+        };
+
+        let id = self
+            .add_consumer_group(stream_id, topic_id, meta)
+            .ok_or_else(|| {
+                IggyError::TopicIdNotFound(
+                    Identifier::numeric(topic_id as u32).unwrap(),
+                    Identifier::numeric(stream_id as u32).unwrap(),
+                )
+            })?;
+        Ok(id)
+    }
+}
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 7b26aff1e..7fe8dbbb3 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -102,7 +102,7 @@ impl IggyShard {
                 if user
                     .personal_access_tokens
                     .iter()
-                    .any(|pat| pat.name.as_str() == name.as_str())
+                    .any(|pat| pat.name == name)
                 {
                     error!(
                         "Personal access token: {name} for user with ID: 
{user_id} already exists."
@@ -150,7 +150,7 @@ impl IggyShard {
                 let token = if let Some(pat) = user
                     .personal_access_tokens
                     .iter()
-                    .find(|pat| pat.name.as_str() == name)
+                    .find(|pat| &*pat.name == name)
                 {
                     pat.token.clone()
                 } else {
@@ -182,7 +182,7 @@ impl IggyShard {
         let users = self.users.values();
         let mut personal_access_token = None;
         for user in &users {
-            if let Some(pat) = user.personal_access_tokens.get(&token_hash) {
+            if let Some(pat) = 
user.personal_access_tokens.get(token_hash.as_str()) {
                 personal_access_token = Some(pat);
                 break;
             }
@@ -205,7 +205,7 @@ impl IggyShard {
                 personal_access_token.name, personal_access_token.user_id
             );
             return Err(IggyError::PersonalAccessTokenExpired(
-                personal_access_token.name.as_str().to_owned(),
+                (*personal_access_token.name).to_owned(),
                 personal_access_token.user_id,
             ));
         }
diff --git 
a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs 
b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
index 3c1203a9a..83b2df434 100644
--- a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
@@ -50,7 +50,7 @@ async fn clear_personal_access_tokens(shard: Rc<IggyShard>) 
-> Result<(), IggyEr
 
     let users = shard.users.values();
     for user in &users {
-        let expired_tokens: Vec<Arc<String>> = user
+        let expired_tokens: Vec<Arc<str>> = user
             .personal_access_tokens
             .iter()
             .filter(|entry| entry.value().is_expired(now))
diff --git a/core/server/src/streaming/users/user.rs 
b/core/server/src/streaming/users/user.rs
index 672662d3e..4882d4542 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -32,7 +32,7 @@ pub struct User {
     pub password: String,
     pub created_at: IggyTimestamp,
     pub permissions: Option<Permissions>,
-    pub personal_access_tokens: DashMap<Arc<String>, PersonalAccessToken>,
+    pub personal_access_tokens: DashMap<Arc<str>, PersonalAccessToken>,
 }
 
 impl Default for User {


Reply via email to