This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch shared-metadata
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit d41ae9caadfff987d0e5621d8dcf9a3691f847cb
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 9 13:31:43 2026 +0100

    feat(metadata): add shared metadata module with ArcSwap-based storage
---
 Cargo.lock                                         |   37 +
 DEPENDENCIES.md                                    |    4 +
 .../common/src/types/personal_access_tokens/mod.rs |   18 +-
 core/server/Cargo.toml                             |    2 +
 core/server/src/bootstrap.rs                       |    2 +-
 core/server/src/http/mapper.rs                     |    2 +-
 core/server/src/lib.rs                             |    1 +
 core/server/src/metadata/consumer_group.rs         |   87 +
 core/server/src/metadata/consumer_offsets_store.rs |  127 ++
 core/server/src/metadata/mod.rs                    |   57 +
 core/server/src/metadata/partition.rs              |   31 +
 core/server/src/metadata/shared.rs                 | 1873 ++++++++++++++++++++
 core/server/src/metadata/snapshot.rs               |   52 +
 core/server/src/metadata/stream.rs                 |   64 +
 core/server/src/metadata/topic.rs                  |   51 +
 core/server/src/metadata/user.rs                   |   30 +
 .../src/shard/system/personal_access_tokens.rs     |    9 +-
 .../periodic/personal_access_token_cleaner.rs      |    6 +-
 core/server/src/streaming/users/user.rs            |    2 +-
 19 files changed, 2436 insertions(+), 19 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index b5761f4f3..2e738b37d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -433,6 +433,12 @@ dependencies = [
  "rustversion",
 ]
 
+[[package]]
+name = "archery"
+version = "1.2.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "70e0a5f99dfebb87bb342d0f53bb92c81842e100bbb915223e38349580e5441d"
+
 [[package]]
 name = "arcshift"
 version = "0.4.2"
@@ -1184,6 +1190,12 @@ dependencies = [
  "serde_core",
 ]
 
+[[package]]
+name = "bitmaps"
+version = "3.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a1d084b0137aaa901caf9f1e8b21daa6aa24d41cd806e111335541eff9683bd6"
+
 [[package]]
 name = "bitvec"
 version = "1.0.1"
@@ -4895,6 +4907,29 @@ dependencies = [
  "winapi-util",
 ]
 
+[[package]]
+name = "imbl"
+version = "6.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0fade8ae6828627ad1fa094a891eccfb25150b383047190a3648d66d06186501"
+dependencies = [
+ "archery",
+ "bitmaps",
+ "imbl-sized-chunks",
+ "rand_core 0.9.3",
+ "rand_xoshiro",
+ "version_check",
+]
+
+[[package]]
+name = "imbl-sized-chunks"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8f4241005618a62f8d57b2febd02510fb96e0137304728543dfc5fd6f052c22d"
+dependencies = [
+ "bitmaps",
+]
+
 [[package]]
 name = "impl-more"
 version = "0.1.9"
@@ -8256,6 +8291,7 @@ version = "0.6.1-edge.3"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
+ "arc-swap",
  "arcshift",
  "argon2",
  "async-channel",
@@ -8287,6 +8323,7 @@ dependencies = [
  "human-repr",
  "hwlocality",
  "iggy_common",
+ "imbl",
  "jsonwebtoken",
  "lending-iterator",
  "mimalloc",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 8423e83d3..33457d729 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -33,6 +33,7 @@ anyhow: 1.0.100, "Apache-2.0 OR MIT",
 apache-avro: 0.17.0, "Apache-2.0",
 arbitrary: 1.4.2, "Apache-2.0 OR MIT",
 arc-swap: 1.8.0, "Apache-2.0 OR MIT",
+archery: 1.2.2, "MIT",
 arcshift: 0.4.2, "Apache-2.0 OR MIT",
 argon2: 0.5.3, "Apache-2.0 OR MIT",
 array-init: 2.1.0, "Apache-2.0 OR MIT",
@@ -98,6 +99,7 @@ bit-set: 0.8.0, "Apache-2.0 OR MIT",
 bit-vec: 0.8.0, "Apache-2.0 OR MIT",
 bitflags: 1.3.2, "Apache-2.0 OR MIT",
 bitflags: 2.10.0, "Apache-2.0 OR MIT",
+bitmaps: 3.2.1, "MPL-2.0+",
 bitvec: 1.0.1, "MIT",
 blake2: 0.10.6, "Apache-2.0 OR MIT",
 blake3: 1.8.2, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR CC0-1.0",
@@ -410,6 +412,8 @@ iggy_connector_sdk: 0.1.1-edge.1, "Apache-2.0",
 iggy_connector_stdout_sink: 0.1.0, "Apache-2.0",
 iggy_examples: 0.0.5, "Apache-2.0",
 ignore: 0.4.25, "MIT OR Unlicense",
+imbl: 6.1.0, "MPL-2.0+",
+imbl-sized-chunks: 0.1.3, "MPL-2.0+",
 impl-more: 0.1.9, "Apache-2.0 OR MIT",
 implicit-clone: 0.6.0, "Apache-2.0 OR MIT",
 implicit-clone-derive: 0.1.2, "Apache-2.0 OR MIT",
diff --git a/core/common/src/types/personal_access_tokens/mod.rs 
b/core/common/src/types/personal_access_tokens/mod.rs
index e28a585de..0e1e33391 100644
--- a/core/common/src/types/personal_access_tokens/mod.rs
+++ b/core/common/src/types/personal_access_tokens/mod.rs
@@ -28,8 +28,8 @@ const SIZE: usize = 50;
 #[derive(Clone, Debug)]
 pub struct PersonalAccessToken {
     pub user_id: UserId,
-    pub name: Arc<String>,
-    pub token: Arc<String>,
+    pub name: Arc<str>,
+    pub token: Arc<str>,
     pub expiry_at: Option<IggyTimestamp>,
 }
 
@@ -49,8 +49,8 @@ impl PersonalAccessToken {
         (
             Self {
                 user_id,
-                name: Arc::new(name.to_string()),
-                token: Arc::new(token_hash),
+                name: Arc::from(name),
+                token: Arc::from(token_hash),
                 expiry_at: Self::calculate_expiry_at(now, expiry),
             },
             token,
@@ -65,8 +65,8 @@ impl PersonalAccessToken {
     ) -> Self {
         Self {
             user_id,
-            name: Arc::new(name.into()),
-            token: Arc::new(token_hash.into()),
+            name: Arc::from(name),
+            token: Arc::from(token_hash),
             expiry_at,
         }
     }
@@ -106,12 +106,12 @@ mod tests {
         let name = "test_token";
         let (personal_access_token, raw_token) =
             PersonalAccessToken::new(user_id, name, now, 
IggyExpiry::NeverExpire);
-        assert_eq!(personal_access_token.name.as_str(), name);
+        assert_eq!(&*personal_access_token.name, name);
         assert!(!personal_access_token.token.is_empty());
         assert!(!raw_token.is_empty());
-        assert_ne!(personal_access_token.token.as_str(), raw_token);
+        assert_ne!(&*personal_access_token.token, raw_token);
         assert_eq!(
-            personal_access_token.token.as_str(),
+            &*personal_access_token.token,
             PersonalAccessToken::hash_token(&raw_token)
         );
     }
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index ff38a114b..8d4cfc65c 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -40,6 +40,7 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"]
 [dependencies]
 ahash = { workspace = true }
 anyhow = { workspace = true }
+arc-swap = "1.8.0"
 arcshift = "0.4.2"
 argon2 = { workspace = true }
 async-channel = { workspace = true }
@@ -71,6 +72,7 @@ hash32 = "1.0.0"
 human-repr = { workspace = true }
 
 iggy_common = { workspace = true }
+imbl = "6.1.0"
 jsonwebtoken = { version = "10.2.0", features = ["rust_crypto"] }
 lending-iterator = "0.1.7"
 mimalloc = { workspace = true, optional = true }
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 216e7c93a..13690a813 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -212,7 +212,7 @@ pub fn load_users(state: impl IntoIterator<Item = 
UserState>) -> Users {
             .into_values()
             .map(|token| {
                 (
-                    Arc::new(token.token_hash.clone()),
+                    Arc::from(token.token_hash.as_str()),
                     PersonalAccessToken::raw(id, &token.name, 
&token.token_hash, token.expiry_at),
                 )
             })
diff --git a/core/server/src/http/mapper.rs b/core/server/src/http/mapper.rs
index afe243529..d7ec542f2 100644
--- a/core/server/src/http/mapper.rs
+++ b/core/server/src/http/mapper.rs
@@ -138,7 +138,7 @@ pub fn map_personal_access_tokens(
     let mut personal_access_tokens_data = 
Vec::with_capacity(personal_access_tokens.len());
     for personal_access_token in personal_access_tokens {
         let personal_access_token = PersonalAccessTokenInfo {
-            name: personal_access_token.name.as_str().to_owned(),
+            name: (*personal_access_token.name).to_owned(),
             expiry_at: personal_access_token.expiry_at,
         };
         personal_access_tokens_data.push(personal_access_token);
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 2919214b6..41669486f 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -37,6 +37,7 @@ pub mod diagnostics;
 pub mod http;
 pub mod io;
 pub mod log;
+pub mod metadata;
 pub mod quic;
 pub mod server_error;
 pub mod shard;
diff --git a/core/server/src/metadata/consumer_group.rs 
b/core/server/src/metadata/consumer_group.rs
new file mode 100644
index 000000000..b7ee83c37
--- /dev/null
+++ b/core/server/src/metadata/consumer_group.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{ClientId, ConsumerGroupId, PartitionId, 
SLAB_SEGMENT_SIZE};
+use iggy_common::collections::SegmentedSlab;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMemberMeta {
+    pub id: usize,
+    pub client_id: ClientId,
+    pub partitions: Vec<PartitionId>,
+    pub partition_index: Arc<AtomicUsize>,
+}
+
+impl ConsumerGroupMemberMeta {
+    pub fn new(id: usize, client_id: ClientId) -> Self {
+        Self {
+            id,
+            client_id,
+            partitions: Vec::new(),
+            partition_index: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+}
+
+#[derive(Clone, Debug)]
+pub struct ConsumerGroupMeta {
+    pub id: ConsumerGroupId,
+    pub name: Arc<str>,
+    pub partitions: Vec<PartitionId>,
+    pub members: SegmentedSlab<ConsumerGroupMemberMeta, SLAB_SEGMENT_SIZE>,
+}
+
+impl ConsumerGroupMeta {
+    /// Rebalance partition assignments among members (round-robin).
+    pub fn rebalance_members(&mut self) {
+        let partition_count = self.partitions.len();
+        let member_count = self.members.len();
+
+        if member_count == 0 || partition_count == 0 {
+            return;
+        }
+
+        let mut members = std::mem::take(&mut self.members);
+
+        // Clear all member partitions and rebuild assignments
+        let member_ids: Vec<usize> = members.iter().map(|(id, _)| 
id).collect();
+        for &member_id in &member_ids {
+            if let Some(member) = members.get(member_id) {
+                let mut updated_member = member.clone();
+                updated_member.partitions.clear();
+                let (new_members, _) = members.update(member_id, 
updated_member);
+                members = new_members;
+            }
+        }
+
+        for (i, &partition_id) in self.partitions.iter().enumerate() {
+            let member_idx = i % member_count;
+            if let Some(&member_id) = member_ids.get(member_idx)
+                && let Some(member) = members.get(member_id)
+            {
+                let mut updated_member = member.clone();
+                updated_member.partitions.push(partition_id);
+                let (new_members, _) = members.update(member_id, 
updated_member);
+                members = new_members;
+            }
+        }
+
+        self.members = members;
+    }
+}
diff --git a/core/server/src/metadata/consumer_offsets_store.rs 
b/core/server/src/metadata/consumer_offsets_store.rs
new file mode 100644
index 000000000..85db0735b
--- /dev/null
+++ b/core/server/src/metadata/consumer_offsets_store.rs
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{PartitionId, StreamId, TopicId};
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets};
+use std::sync::Arc;
+
+/// Cross-shard consumer offsets visibility using DashMap.
+///
+/// When a partition is created, the consumer offsets Arcs are registered here.
+/// Any shard can then access them without routing to the owning shard.
+#[derive(Debug, Default)]
+pub struct SharedConsumerOffsetsStore {
+    consumer_offsets: dashmap::DashMap<(StreamId, TopicId, PartitionId), 
Arc<ConsumerOffsets>>,
+    consumer_group_offsets:
+        dashmap::DashMap<(StreamId, TopicId, PartitionId), 
Arc<ConsumerGroupOffsets>>,
+}
+
+impl SharedConsumerOffsetsStore {
+    pub fn new() -> Self {
+        Self {
+            consumer_offsets: dashmap::DashMap::new(),
+            consumer_group_offsets: dashmap::DashMap::new(),
+        }
+    }
+
+    pub fn register_consumer_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+        offsets: Arc<ConsumerOffsets>,
+    ) {
+        self.consumer_offsets
+            .insert((stream_id, topic_id, partition_id), offsets);
+    }
+
+    pub fn get_consumer_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<ConsumerOffsets>> {
+        self.consumer_offsets
+            .get(&(stream_id, topic_id, partition_id))
+            .map(|r| Arc::clone(&r))
+    }
+
+    pub fn remove_consumer_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) {
+        self.consumer_offsets
+            .remove(&(stream_id, topic_id, partition_id));
+    }
+
+    pub fn register_consumer_group_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+        offsets: Arc<ConsumerGroupOffsets>,
+    ) {
+        self.consumer_group_offsets
+            .insert((stream_id, topic_id, partition_id), offsets);
+    }
+
+    pub fn get_consumer_group_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<ConsumerGroupOffsets>> {
+        self.consumer_group_offsets
+            .get(&(stream_id, topic_id, partition_id))
+            .map(|r| Arc::clone(&r))
+    }
+
+    pub fn remove_consumer_group_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) {
+        self.consumer_group_offsets
+            .remove(&(stream_id, topic_id, partition_id));
+    }
+
+    pub fn remove_partition_offsets(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) {
+        self.remove_consumer_offsets(stream_id, topic_id, partition_id);
+        self.remove_consumer_group_offsets(stream_id, topic_id, partition_id);
+    }
+
+    pub fn remove_topic_offsets(&self, stream_id: StreamId, topic_id: TopicId) 
{
+        self.consumer_offsets
+            .retain(|&(s, t, _), _| s != stream_id || t != topic_id);
+        self.consumer_group_offsets
+            .retain(|&(s, t, _), _| s != stream_id || t != topic_id);
+    }
+
+    pub fn remove_stream_offsets(&self, stream_id: StreamId) {
+        self.consumer_offsets.retain(|&(s, _, _), _| s != stream_id);
+        self.consumer_group_offsets
+            .retain(|&(s, _, _), _| s != stream_id);
+    }
+}
diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs
new file mode 100644
index 000000000..ce6543cfc
--- /dev/null
+++ b/core/server/src/metadata/mod.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared metadata module providing a single source of truth for all shards.
+//!
+//! This module provides an `ArcSwap`-based approach where all shards read from
+//! a shared snapshot, and only shard 0 can write (swap in new snapshots).
+//!
+//! # Architecture
+//!
+//! - `GlobalMetadata` (snapshot.rs): Immutable snapshot with all metadata
+//! - `SharedMetadata` (shared.rs): Thread-safe wrapper with ArcSwap
+//! - Entity types: `StreamMeta`, `TopicMeta`, `PartitionMeta`, `UserMeta`, 
`ConsumerGroupMeta`
+//! - `SharedConsumerOffsetsStore`: Cross-shard consumer offsets visibility
+
+mod consumer_group;
+mod consumer_offsets_store;
+mod partition;
+mod shared;
+mod snapshot;
+mod stream;
+mod topic;
+mod user;
+
+pub use consumer_group::{ConsumerGroupMemberMeta, ConsumerGroupMeta};
+pub use consumer_offsets_store::SharedConsumerOffsetsStore;
+pub use partition::PartitionMeta;
+pub use shared::SharedMetadata;
+pub use snapshot::GlobalMetadata;
+pub use stream::StreamMeta;
+pub use topic::TopicMeta;
+pub use user::UserMeta;
+
+pub type StreamId = usize;
+pub type TopicId = usize;
+pub type PartitionId = usize;
+pub type UserId = u32;
+pub type ClientId = u32;
+pub type ConsumerGroupId = usize;
+pub type ConsumerGroupKey = (StreamId, TopicId, ConsumerGroupId);
+
+/// Segment size for SegmentedSlab (1024 entries per segment).
+pub const SLAB_SEGMENT_SIZE: usize = 1024;
diff --git a/core/server/src/metadata/partition.rs 
b/core/server/src/metadata/partition.rs
new file mode 100644
index 000000000..7b98fdafc
--- /dev/null
+++ b/core/server/src/metadata/partition.rs
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::PartitionId;
+use crate::streaming::stats::PartitionStats;
+use iggy_common::IggyTimestamp;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct PartitionMeta {
+    pub id: PartitionId,
+    pub created_at: IggyTimestamp,
+    /// Monotonically increasing version to detect stale partition_store 
entries.
+    /// Set to the GlobalMetadata version when the partition was created.
+    pub creation_version: u64,
+    pub stats: Arc<PartitionStats>,
+}
diff --git a/core/server/src/metadata/shared.rs 
b/core/server/src/metadata/shared.rs
new file mode 100644
index 000000000..e374526b3
--- /dev/null
+++ b/core/server/src/metadata/shared.rs
@@ -0,0 +1,1873 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{
+    ConsumerGroupId, ConsumerGroupMemberMeta, ConsumerGroupMeta, 
GlobalMetadata, PartitionId,
+    PartitionMeta, StreamId, StreamMeta, TopicId, TopicMeta, UserId, UserMeta,
+};
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use arc_swap::{ArcSwap, Guard};
+use iggy_common::collections::SegmentedSlab;
+use iggy_common::{
+    CompressionAlgorithm, IdKind, Identifier, IggyError, IggyExpiry, 
IggyTimestamp, MaxTopicSize,
+    PersonalAccessToken, UserStatus,
+};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+
+/// Thread-safe wrapper for GlobalMetadata using ArcSwap for lock-free reads.
+/// Uses hierarchical structure: streams contain topics, topics contain 
partitions and consumer groups.
+/// IDs are assigned by SegmentedSlab::insert() at each level.
+pub struct SharedMetadata {
+    inner: ArcSwap<GlobalMetadata>,
+}
+
+impl Default for SharedMetadata {
+    fn default() -> Self {
+        Self::new(GlobalMetadata::new())
+    }
+}
+
+impl SharedMetadata {
+    pub fn new(initial: GlobalMetadata) -> Self {
+        Self {
+            inner: ArcSwap::from_pointee(initial),
+        }
+    }
+
+    #[inline]
+    pub fn load(&self) -> Guard<Arc<GlobalMetadata>> {
+        self.inner.load()
+    }
+
+    #[inline]
+    pub fn load_full(&self) -> Arc<GlobalMetadata> {
+        self.inner.load_full()
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Stream operations
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Add a stream with a specific ID (for bootstrap/recovery).
+    pub fn add_stream_with_id(&self, id: StreamId, meta: StreamMeta) {
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+            let entries: Vec<_> = new
+                .streams
+                .iter()
+                .map(|(k, v)| (k, v.clone()))
+                .chain(std::iter::once((id, meta.clone())))
+                .collect();
+            new.streams = SegmentedSlab::from_entries(entries);
+            new.stream_index = new.stream_index.update(meta.name.clone(), id);
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    /// Add a new stream with slab-assigned ID. Returns the assigned ID.
+    pub fn add_stream(&self, meta: StreamMeta) -> StreamId {
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let assigned_id_clone = assigned_id.clone();
+
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+            let (streams, id) = new.streams.insert(meta.clone());
+            assigned_id_clone.store(id, Ordering::Release);
+            new.streams = streams;
+            new.stream_index = new.stream_index.update(meta.name.clone(), id);
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        assigned_id.load(Ordering::Acquire)
+    }
+
+    /// Atomically validates name uniqueness and updates stream name.
+    /// Returns Ok(()) if update succeeded, or appropriate error.
+    pub fn try_update_stream(&self, id: StreamId, new_name: Arc<str>) -> 
Result<(), IggyError> {
+        let stream_not_found = Arc::new(AtomicBool::new(false));
+        let name_conflict = Arc::new(AtomicBool::new(false));
+        let unchanged = Arc::new(AtomicBool::new(false));
+
+        let stream_not_found_clone = stream_not_found.clone();
+        let name_conflict_clone = name_conflict.clone();
+        let unchanged_clone = unchanged.clone();
+        let new_name_clone = new_name.clone();
+
+        self.inner.rcu(move |current| {
+            // Check if stream exists
+            let Some(old_meta) = current.streams.get(id) else {
+                stream_not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            // No-op if name is unchanged
+            if old_meta.name == new_name_clone {
+                unchanged_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            // Check if new name conflicts with another stream
+            if let Some(&existing_id) = 
current.stream_index.get(&new_name_clone)
+                && existing_id != id
+            {
+                name_conflict_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            // All checks passed - perform atomic update
+            stream_not_found_clone.store(false, Ordering::Release);
+            name_conflict_clone.store(false, Ordering::Release);
+            unchanged_clone.store(false, Ordering::Release);
+
+            let mut new = (**current).clone();
+            new.stream_index = new.stream_index.without(&old_meta.name);
+
+            let mut updated = old_meta.clone();
+            updated.name = new_name_clone.clone();
+            let (streams, _) = new.streams.update(id, updated);
+            new.streams = streams;
+            new.stream_index = new.stream_index.update(new_name_clone.clone(), 
id);
+            new.version += 1;
+
+            Arc::new(new)
+        });
+
+        if stream_not_found.load(Ordering::Acquire) {
+            Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(id as u32).unwrap(),
+            ))
+        } else if name_conflict.load(Ordering::Acquire) {
+            Err(IggyError::StreamNameAlreadyExists(new_name.to_string()))
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Delete a stream and all its nested topics/partitions/consumer groups.
+    pub fn delete_stream(&self, id: StreamId) {
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+            if let Some(stream) = new.streams.get(id) {
+                new.stream_index = new.stream_index.without(&stream.name);
+            }
+            let (streams, _) = new.streams.remove(id);
+            new.streams = streams;
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Topic operations (nested in stream)
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Add a topic with a specific ID (for bootstrap/recovery).
+    pub fn add_topic_with_id(&self, stream_id: StreamId, topic_id: TopicId, 
meta: TopicMeta) {
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+                let entries: Vec<_> = updated_stream
+                    .topics
+                    .iter()
+                    .map(|(k, v)| (k, v.clone()))
+                    .chain(std::iter::once((topic_id, meta.clone())))
+                    .collect();
+                updated_stream.topics = SegmentedSlab::from_entries(entries);
+                updated_stream.topic_index = updated_stream
+                    .topic_index
+                    .update(meta.name.clone(), topic_id);
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    /// Add a new topic with slab-assigned ID. Returns the assigned ID.
+    pub fn add_topic(&self, stream_id: StreamId, meta: TopicMeta) -> 
Option<TopicId> {
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+        let assigned_id_clone = assigned_id.clone();
+
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+                let (topics, id) = updated_stream.topics.insert(meta.clone());
+                assigned_id_clone.store(id, Ordering::Release);
+                updated_stream.topics = topics;
+                updated_stream.topic_index =
+                    updated_stream.topic_index.update(meta.name.clone(), id);
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        let id = assigned_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    /// Atomically validates name uniqueness and updates topic.
+    /// Returns Ok(()) if update succeeded, or appropriate error.
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_update_topic(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        new_name: Arc<str>,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) -> Result<(), IggyError> {
+        let stream_not_found = Arc::new(AtomicBool::new(false));
+        let topic_not_found = Arc::new(AtomicBool::new(false));
+        let name_conflict = Arc::new(AtomicBool::new(false));
+
+        let stream_not_found_clone = stream_not_found.clone();
+        let topic_not_found_clone = topic_not_found.clone();
+        let name_conflict_clone = name_conflict.clone();
+        let new_name_clone = new_name.clone();
+
+        self.inner.rcu(move |current| {
+            // Check if stream exists
+            let Some(stream) = current.streams.get(stream_id) else {
+                stream_not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            // Check if topic exists
+            let Some(old_meta) = stream.topics.get(topic_id) else {
+                topic_not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            // Check if new name conflicts with another topic
+            if old_meta.name != new_name_clone
+                && let Some(&existing_id) = 
stream.topic_index.get(&new_name_clone)
+                && existing_id != topic_id
+            {
+                name_conflict_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            // All checks passed - perform atomic update
+            stream_not_found_clone.store(false, Ordering::Release);
+            topic_not_found_clone.store(false, Ordering::Release);
+            name_conflict_clone.store(false, Ordering::Release);
+
+            let mut new = (**current).clone();
+            let mut updated_stream = stream.clone();
+
+            // Update topic index if name changed
+            if old_meta.name != new_name_clone {
+                updated_stream.topic_index = 
updated_stream.topic_index.without(&old_meta.name);
+                updated_stream.topic_index = updated_stream
+                    .topic_index
+                    .update(new_name_clone.clone(), topic_id);
+            }
+
+            // Build updated topic meta preserving immutable fields
+            let updated_meta = TopicMeta {
+                id: topic_id,
+                name: new_name_clone.clone(),
+                created_at: old_meta.created_at,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+                stats: old_meta.stats.clone(),
+                partitions: old_meta.partitions.clone(),
+                consumer_groups: old_meta.consumer_groups.clone(),
+                consumer_group_index: old_meta.consumer_group_index.clone(),
+                partition_counter: old_meta.partition_counter.clone(),
+            };
+
+            let (topics, _) = updated_stream.topics.update(topic_id, 
updated_meta);
+            updated_stream.topics = topics;
+
+            let (streams, _) = new.streams.update(stream_id, updated_stream);
+            new.streams = streams;
+            new.version += 1;
+
+            Arc::new(new)
+        });
+
+        if stream_not_found.load(Ordering::Acquire) {
+            Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else if topic_not_found.load(Ordering::Acquire) {
+            Err(IggyError::TopicIdNotFound(
+                Identifier::numeric(topic_id as u32).unwrap(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else if name_conflict.load(Ordering::Acquire) {
+            Err(IggyError::TopicNameAlreadyExists(
+                new_name.to_string(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Delete a topic and all its nested partitions/consumer groups.
+    pub fn delete_topic(&self, stream_id: StreamId, topic_id: TopicId) {
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    updated_stream.topic_index = 
updated_stream.topic_index.without(&topic.name);
+                }
+                let (topics, _) = updated_stream.topics.remove(topic_id);
+                updated_stream.topics = topics;
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Partition operations (nested in topic)
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Add partitions with specific IDs (for bootstrap/recovery).
+    pub fn add_partitions_with_ids(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions: Vec<(PartitionId, PartitionMeta)>,
+    ) {
+        if partitions.is_empty() {
+            return;
+        }
+        self.inner.rcu(move |current| {
+            let partitions = partitions.clone();
+            let mut new = (**current).clone();
+            let new_version = new.version + 1;
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    let entries: Vec<_> = updated_topic
+                        .partitions
+                        .iter()
+                        .map(|(k, v)| (k, v.clone()))
+                        .chain(partitions.into_iter().map(|(id, mut meta)| {
+                            meta.creation_version = new_version;
+                            (id, meta)
+                        }))
+                        .collect();
+                    updated_topic.partitions = 
SegmentedSlab::from_entries(entries);
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version = new_version;
+            Arc::new(new)
+        });
+    }
+
+    /// Add new partitions with slab-assigned IDs. Returns the assigned IDs.
+    pub fn add_partitions(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partitions: Vec<PartitionMeta>,
+    ) -> Vec<PartitionId> {
+        if partitions.is_empty() {
+            return Vec::new();
+        }
+
+        let assigned_ids = Arc::new(std::sync::Mutex::new(Vec::new()));
+        let assigned_ids_clone = assigned_ids.clone();
+
+        self.inner.rcu(move |current| {
+            let partitions = partitions.clone();
+            let mut new = (**current).clone();
+            let new_version = new.version + 1;
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+                    let mut ids = Vec::new();
+
+                    for mut meta in partitions {
+                        meta.creation_version = new_version;
+                        let (parts, id) = 
updated_topic.partitions.insert(meta.clone());
+                        // Update meta's id to match the slab-assigned ID
+                        meta.id = id;
+                        let (parts, _) = parts.update(id, meta);
+                        updated_topic.partitions = parts;
+                        ids.push(id);
+                    }
+
+                    *assigned_ids_clone.lock().unwrap() = ids;
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version = new_version;
+            Arc::new(new)
+        });
+
+        Arc::try_unwrap(assigned_ids).unwrap().into_inner().unwrap()
+    }
+
+    pub fn delete_partitions(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_ids: &[PartitionId],
+    ) {
+        if partition_ids.is_empty() {
+            return;
+        }
+        let partition_ids = partition_ids.to_vec();
+
+        self.inner.rcu(move |current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    for &partition_id in &partition_ids {
+                        let (parts, _) = 
updated_topic.partitions.remove(partition_id);
+                        updated_topic.partitions = parts;
+                    }
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // User operations
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Add a user with a specific ID (for bootstrap/recovery).
+    pub fn add_user_with_id(&self, id: UserId, meta: UserMeta) {
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+            let entries: Vec<_> = new
+                .users
+                .iter()
+                .map(|(k, v)| (k, v.clone()))
+                .chain(std::iter::once((id as usize, meta.clone())))
+                .collect();
+            new.users = SegmentedSlab::from_entries(entries);
+            new.user_index = new.user_index.update(meta.username.clone(), id);
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    /// Add a new user with slab-assigned ID. Returns the assigned ID.
+    pub fn add_user(&self, meta: UserMeta) -> UserId {
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let assigned_id_clone = assigned_id.clone();
+
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+            let (users, id) = new.users.insert(meta.clone());
+            assigned_id_clone.store(id, Ordering::Release);
+            new.users = users;
+            new.user_index = new.user_index.update(meta.username.clone(), id 
as UserId);
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        assigned_id.load(Ordering::Acquire) as UserId
+    }
+
+    /// Atomically validates username uniqueness and updates user.
+    /// Returns the updated UserMeta if successful.
+    pub fn try_update_user(
+        &self,
+        id: UserId,
+        new_username: Option<Arc<str>>,
+        new_status: Option<UserStatus>,
+    ) -> Result<UserMeta, IggyError> {
+        let user_not_found = Arc::new(AtomicBool::new(false));
+        let name_conflict = Arc::new(AtomicBool::new(false));
+        let updated_meta: Arc<std::sync::Mutex<Option<UserMeta>>> =
+            Arc::new(std::sync::Mutex::new(None));
+
+        let user_not_found_clone = user_not_found.clone();
+        let name_conflict_clone = name_conflict.clone();
+        let updated_meta_clone = updated_meta.clone();
+        let new_username_clone = new_username.clone();
+
+        self.inner.rcu(move |current| {
+            let Some(old_meta) = current.users.get(id as usize) else {
+                user_not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            let final_username = new_username_clone
+                .clone()
+                .unwrap_or_else(|| old_meta.username.clone());
+
+            if final_username != old_meta.username
+                && let Some(&existing_id) = 
current.user_index.get(&final_username)
+                && existing_id != id
+            {
+                name_conflict_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            user_not_found_clone.store(false, Ordering::Release);
+            name_conflict_clone.store(false, Ordering::Release);
+
+            let mut new = (**current).clone();
+
+            if final_username != old_meta.username {
+                new.user_index = new.user_index.without(&old_meta.username);
+                new.user_index = new.user_index.update(final_username.clone(), 
id);
+            }
+
+            let meta = UserMeta {
+                id: old_meta.id,
+                username: final_username,
+                password_hash: old_meta.password_hash.clone(),
+                status: new_status.unwrap_or(old_meta.status),
+                permissions: old_meta.permissions.clone(),
+                created_at: old_meta.created_at,
+            };
+
+            *updated_meta_clone.lock().unwrap() = Some(meta.clone());
+
+            let (users, _) = new.users.update(id as usize, meta);
+            new.users = users;
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        if user_not_found.load(Ordering::Acquire) {
+            Err(IggyError::ResourceNotFound(format!("User {}", id)))
+        } else if name_conflict.load(Ordering::Acquire) {
+            Err(IggyError::UserAlreadyExists)
+        } else {
+            Ok(updated_meta.lock().unwrap().take().unwrap())
+        }
+    }
+
+    /// Updates user metadata directly. Use only when username is not changing
+    /// or when caller has already verified username uniqueness.
+    pub fn update_user_meta(&self, id: UserId, meta: UserMeta) {
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+
+            if let Some(old_meta) = new.users.get(id as usize)
+                && old_meta.username != meta.username
+            {
+                new.user_index = new.user_index.without(&old_meta.username);
+                new.user_index = new.user_index.update(meta.username.clone(), 
id);
+            }
+
+            let (users, _) = new.users.update(id as usize, meta);
+            new.users = users;
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    pub fn delete_user(&self, id: UserId) {
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+            if let Some(user) = new.users.get(id as usize) {
+                new.user_index = new.user_index.without(&user.username);
+            }
+            let (users, _) = new.users.remove(id as usize);
+            new.users = users;
+            if let Some(user_pats) = new.personal_access_tokens.get(&id) {
+                for (token_hash, _) in user_pats.iter() {
+                    new.token_to_user = new.token_to_user.without(token_hash);
+                }
+            }
+            new.personal_access_tokens = 
new.personal_access_tokens.without(&id);
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Personal access token operations
+    // ═══════════════════════════════════════════════════════════════
+
+    pub fn add_personal_access_token(&self, user_id: UserId, pat: 
PersonalAccessToken) {
+        self.inner.rcu(move |current| {
+            let pat = pat.clone();
+            let mut new = (**current).clone();
+            let token_hash = pat.token.clone();
+            let user_pats = new
+                .personal_access_tokens
+                .get(&user_id)
+                .cloned()
+                .unwrap_or_default();
+            new.personal_access_tokens = new
+                .personal_access_tokens
+                .update(user_id, user_pats.update(token_hash.clone(), pat));
+            new.token_to_user = new.token_to_user.update(token_hash, user_id);
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    pub fn delete_personal_access_token(&self, user_id: UserId, token_hash: 
&Arc<str>) {
+        let token_hash = token_hash.clone();
+        self.inner.rcu(move |current| {
+            let mut new = (**current).clone();
+            if let Some(user_pats) = new.personal_access_tokens.get(&user_id) {
+                new.personal_access_tokens = new
+                    .personal_access_tokens
+                    .update(user_id, user_pats.without(&token_hash));
+            }
+            new.token_to_user = new.token_to_user.without(&token_hash);
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    pub fn get_user_personal_access_tokens(&self, user_id: UserId) -> 
Vec<PersonalAccessToken> {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.values().cloned().collect())
+            .unwrap_or_default()
+    }
+
+    pub fn get_personal_access_token_by_hash(
+        &self,
+        token_hash: &str,
+    ) -> Option<PersonalAccessToken> {
+        let token_hash_arc: Arc<str> = Arc::from(token_hash);
+        let metadata = self.load();
+        let user_id = metadata.token_to_user.get(&token_hash_arc)?;
+        metadata
+            .personal_access_tokens
+            .get(user_id)?
+            .get(&token_hash_arc)
+            .cloned()
+    }
+
+    pub fn user_pat_count(&self, user_id: UserId) -> usize {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.len())
+            .unwrap_or(0)
+    }
+
+    pub fn user_has_pat_with_name(&self, user_id: UserId, name: &str) -> bool {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .map(|pats| pats.values().any(|pat| &*pat.name == name))
+            .unwrap_or(false)
+    }
+
+    pub fn find_pat_token_hash_by_name(&self, user_id: UserId, name: &str) -> 
Option<Arc<str>> {
+        self.load()
+            .personal_access_tokens
+            .get(&user_id)
+            .and_then(|pats| {
+                pats.iter()
+                    .find(|(_, pat)| &*pat.name == name)
+                    .map(|(hash, _)| hash.clone())
+            })
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Consumer group operations (nested in topic)
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Add a consumer group with a specific ID (for bootstrap/recovery).
+    pub fn add_consumer_group_with_id(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        meta: ConsumerGroupMeta,
+    ) {
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    let entries: Vec<_> = updated_topic
+                        .consumer_groups
+                        .iter()
+                        .map(|(k, v)| (k, v.clone()))
+                        .chain(std::iter::once((group_id, meta.clone())))
+                        .collect();
+                    updated_topic.consumer_groups = 
SegmentedSlab::from_entries(entries);
+                    updated_topic.consumer_group_index = updated_topic
+                        .consumer_group_index
+                        .update(meta.name.clone(), group_id);
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    /// Add a new consumer group with slab-assigned ID. Returns the assigned 
ID.
+    pub fn add_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        meta: ConsumerGroupMeta,
+    ) -> Option<ConsumerGroupId> {
+        let assigned_id = Arc::new(AtomicUsize::new(usize::MAX));
+        let assigned_id_clone = assigned_id.clone();
+
+        self.inner.rcu(move |current| {
+            let meta = meta.clone();
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    let (groups, id) = 
updated_topic.consumer_groups.insert(meta.clone());
+                    assigned_id_clone.store(id, Ordering::Release);
+                    updated_topic.consumer_groups = groups;
+                    updated_topic.consumer_group_index = updated_topic
+                        .consumer_group_index
+                        .update(meta.name.clone(), id);
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        let id = assigned_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn delete_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) {
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    if let Some(group) = 
updated_topic.consumer_groups.get(group_id) {
+                        updated_topic.consumer_group_index =
+                            
updated_topic.consumer_group_index.without(&group.name);
+                    }
+                    let (groups, _) = 
updated_topic.consumer_groups.remove(group_id);
+                    updated_topic.consumer_groups = groups;
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    pub fn join_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> Option<usize> {
+        let member_id = Arc::new(AtomicUsize::new(usize::MAX));
+        let member_id_clone = member_id.clone();
+
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    if let Some(group) = 
updated_topic.consumer_groups.get(group_id) {
+                        let mut updated_group = group.clone();
+
+                        let next_id = updated_group
+                            .members
+                            .iter()
+                            .map(|(_, m)| m.id)
+                            .max()
+                            .map(|m| m + 1)
+                            .unwrap_or(0);
+
+                        let new_member = ConsumerGroupMemberMeta::new(next_id, 
client_id);
+                        let (members, _) = 
updated_group.members.insert(new_member);
+                        updated_group.members = members;
+                        updated_group.rebalance_members();
+
+                        member_id_clone.store(next_id, Ordering::Release);
+
+                        let (groups, _) = updated_topic
+                            .consumer_groups
+                            .update(group_id, updated_group);
+                        updated_topic.consumer_groups = groups;
+                    }
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        let id = member_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn leave_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> Option<usize> {
+        let member_id = Arc::new(AtomicUsize::new(usize::MAX));
+        let member_id_clone = member_id.clone();
+
+        self.inner.rcu(|current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    if let Some(group) = 
updated_topic.consumer_groups.get(group_id) {
+                        let mut updated_group = group.clone();
+
+                        // Find member by client_id
+                        let member_to_remove: Option<usize> = updated_group
+                            .members
+                            .iter()
+                            .find(|(_, m)| m.client_id == client_id)
+                            .map(|(id, _)| id);
+
+                        if let Some(mid) = member_to_remove {
+                            member_id_clone.store(mid, Ordering::Release);
+                            let (members, _) = 
updated_group.members.remove(mid);
+                            updated_group.members = members;
+                            updated_group.rebalance_members();
+
+                            let (groups, _) = updated_topic
+                                .consumer_groups
+                                .update(group_id, updated_group);
+                            updated_topic.consumer_groups = groups;
+                        }
+                    }
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        let id = member_id.load(Ordering::Acquire);
+        if id == usize::MAX { None } else { Some(id) }
+    }
+
+    pub fn is_consumer_group_member(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        client_id: u32,
+    ) -> bool {
+        let metadata = self.load();
+        metadata
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .map(|g| g.members.iter().any(|(_, m)| m.client_id == client_id))
+            .unwrap_or(false)
+    }
+
+    pub fn rebalance_consumer_groups_for_topic(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_ids: &[PartitionId],
+    ) {
+        let partition_ids = partition_ids.to_vec();
+
+        self.inner.rcu(move |current| {
+            let mut new = (**current).clone();
+
+            if let Some(stream) = new.streams.get(stream_id) {
+                let mut updated_stream = stream.clone();
+
+                if let Some(topic) = updated_stream.topics.get(topic_id) {
+                    let mut updated_topic = topic.clone();
+
+                    let group_ids: Vec<_> = updated_topic
+                        .consumer_groups
+                        .iter()
+                        .map(|(id, _)| id)
+                        .collect();
+
+                    for gid in group_ids {
+                        if let Some(group) = 
updated_topic.consumer_groups.get(gid) {
+                            let mut updated_group = group.clone();
+                            updated_group.partitions = partition_ids.clone();
+                            updated_group.rebalance_members();
+                            let (groups, _) =
+                                updated_topic.consumer_groups.update(gid, 
updated_group);
+                            updated_topic.consumer_groups = groups;
+                        }
+                    }
+
+                    let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+                    updated_stream.topics = topics;
+                }
+
+                let (streams, _) = new.streams.update(stream_id, 
updated_stream);
+                new.streams = streams;
+            }
+            new.version += 1;
+            Arc::new(new)
+        });
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Lookup helpers
+    // ═══════════════════════════════════════════════════════════════
+
+    pub fn get_stream_id(&self, identifier: &Identifier) -> Option<StreamId> {
+        let metadata = self.load();
+        match identifier.kind {
+            IdKind::Numeric => {
+                let stream_id = identifier.get_u32_value().ok()? as StreamId;
+                if metadata.streams.get(stream_id).is_some() {
+                    Some(stream_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                metadata.stream_index.get(name.as_ref()).copied()
+            }
+        }
+    }
+
+    pub fn stream_name_exists(&self, name: &str) -> bool {
+        self.load().stream_index.contains_key(name)
+    }
+
+    pub fn get_topic_id(&self, stream_id: StreamId, identifier: &Identifier) 
-> Option<TopicId> {
+        let metadata = self.load();
+        let stream = metadata.streams.get(stream_id)?;
+
+        match identifier.kind {
+            IdKind::Numeric => {
+                let topic_id = identifier.get_u32_value().ok()? as TopicId;
+                if stream.topics.get(topic_id).is_some() {
+                    Some(topic_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                stream.topic_index.get(&Arc::from(name.as_ref())).copied()
+            }
+        }
+    }
+
+    pub fn get_user_id(&self, identifier: &Identifier) -> Option<UserId> {
+        let metadata = self.load();
+        match identifier.kind {
+            IdKind::Numeric => Some(identifier.get_u32_value().ok()? as 
UserId),
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                metadata.user_index.get(name.as_ref()).copied()
+            }
+        }
+    }
+
+    pub fn get_consumer_group_id(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        identifier: &Identifier,
+    ) -> Option<ConsumerGroupId> {
+        let metadata = self.load();
+        let stream = metadata.streams.get(stream_id)?;
+        let topic = stream.topics.get(topic_id)?;
+
+        match identifier.kind {
+            IdKind::Numeric => {
+                let group_id = identifier.get_u32_value().ok()? as 
ConsumerGroupId;
+                if topic.consumer_groups.get(group_id).is_some() {
+                    Some(group_id)
+                } else {
+                    None
+                }
+            }
+            IdKind::String => {
+                let name = identifier.get_cow_str_value().ok()?;
+                topic
+                    .consumer_group_index
+                    .get(&Arc::from(name.as_ref()))
+                    .copied()
+            }
+        }
+    }
+
+    pub fn stream_exists(&self, id: StreamId) -> bool {
+        self.load().streams.get(id).is_some()
+    }
+
+    pub fn topic_exists(&self, stream_id: StreamId, topic_id: TopicId) -> bool 
{
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .is_some()
+    }
+
+    pub fn partition_exists(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .is_some()
+    }
+
+    pub fn user_exists(&self, id: UserId) -> bool {
+        self.load().users.get(id as usize).is_some()
+    }
+
+    pub fn consumer_group_exists(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id))
+            .is_some()
+    }
+
+    pub fn consumer_group_exists_by_name(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        name: &str,
+    ) -> bool {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.consumer_group_index.contains_key(name))
+            .unwrap_or(false)
+    }
+
+    pub fn streams_count(&self) -> usize {
+        self.load().streams.len()
+    }
+
+    pub fn topics_count(&self, stream_id: StreamId) -> usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .map(|s| s.topics.len())
+            .unwrap_or(0)
+    }
+
+    pub fn partitions_count(&self, stream_id: StreamId, topic_id: TopicId) -> 
usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.partitions.len())
+            .unwrap_or(0)
+    }
+
+    pub fn get_next_partition_id(&self, stream_id: StreamId, topic_id: 
TopicId) -> Option<usize> {
+        let metadata = self.load();
+        let topic = metadata.streams.get(stream_id)?.topics.get(topic_id)?;
+        let partitions_count = topic.partitions.len();
+
+        if partitions_count == 0 {
+            return None;
+        }
+
+        let counter = &topic.partition_counter;
+        let mut partition_id = counter.fetch_add(1, Ordering::AcqRel);
+        if partition_id >= partitions_count {
+            partition_id %= partitions_count;
+            counter.store(partition_id + 1, Ordering::Relaxed);
+        }
+        Some(partition_id)
+    }
+
+    pub fn get_next_member_partition_id(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+        member_id: usize,
+        calculate: bool,
+    ) -> Option<PartitionId> {
+        let metadata = self.load();
+        let member = metadata
+            .streams
+            .get(stream_id)?
+            .topics
+            .get(topic_id)?
+            .consumer_groups
+            .get(group_id)?
+            .members
+            .get(member_id)?;
+
+        let assigned_partitions = &member.partitions;
+        if assigned_partitions.is_empty() {
+            return None;
+        }
+
+        let partitions_count = assigned_partitions.len();
+        let counter = &member.partition_index;
+
+        if calculate {
+            let current = counter
+                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
+                    Some((current + 1) % partitions_count)
+                })
+                .unwrap();
+            Some(assigned_partitions[current % partitions_count])
+        } else {
+            let current = counter.load(Ordering::Relaxed);
+            Some(assigned_partitions[current % partitions_count])
+        }
+    }
+
+    pub fn users_count(&self) -> usize {
+        self.load().users.len()
+    }
+
+    pub fn username_exists(&self, username: &str) -> bool {
+        self.load().user_index.contains_key(username)
+    }
+
+    pub fn consumer_groups_count(&self, stream_id: StreamId, topic_id: 
TopicId) -> usize {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.consumer_groups.len())
+            .unwrap_or(0)
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Stats getters
+    // ═══════════════════════════════════════════════════════════════
+
+    pub fn get_stream_stats(&self, id: StreamId) -> Option<Arc<StreamStats>> {
+        self.load().streams.get(id).map(|s| s.stats.clone())
+    }
+
+    pub fn get_topic_stats(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+    ) -> Option<Arc<TopicStats>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .map(|t| t.stats.clone())
+    }
+
+    pub fn get_partition_stats_by_ids(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<Arc<PartitionStats>> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id))
+            .map(|p| p.stats.clone())
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Stats registration (atomic create + add)
+    // ═══════════════════════════════════════════════════════════════
+
+    /// Register a stream with a specific ID (for bootstrap/recovery).
+    /// Unlike try_register_stream, this uses a pre-determined stream_id.
+    pub fn register_stream(
+        &self,
+        stream_id: StreamId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+    ) -> Arc<StreamStats> {
+        let stats = Arc::new(StreamStats::default());
+        let meta = StreamMeta::with_stats(stream_id, name, created_at, 
stats.clone());
+        self.add_stream_with_id(stream_id, meta);
+        stats
+    }
+
+    /// Atomically validates name uniqueness and registers stream.
+    pub fn try_register_stream(
+        &self,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+    ) -> Result<(StreamId, Arc<StreamStats>), IggyError> {
+        let stats = Arc::new(StreamStats::default());
+
+        let name_existed = Arc::new(AtomicBool::new(false));
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let name_existed_clone = name_existed.clone();
+        let assigned_id_clone = assigned_id.clone();
+        let name_clone = name.clone();
+        let stats_clone = stats.clone();
+
+        self.inner.rcu(move |current| {
+            if current.stream_index.contains_key(&name_clone) {
+                name_existed_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            name_existed_clone.store(false, Ordering::Release);
+
+            let mut meta =
+                StreamMeta::with_stats(0, name_clone.clone(), created_at, 
stats_clone.clone());
+            let mut new = (**current).clone();
+            let (streams, id) = new.streams.insert(meta.clone());
+            // Update meta's id to match the slab-assigned ID
+            meta.id = id;
+            let (streams, _) = streams.update(id, meta);
+            assigned_id_clone.store(id, Ordering::Release);
+            new.streams = streams;
+            new.stream_index = new.stream_index.update(name_clone.clone(), id);
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        if name_existed.load(Ordering::Acquire) {
+            Err(IggyError::StreamNameAlreadyExists(name.to_string()))
+        } else {
+            Ok((assigned_id.load(Ordering::Acquire), stats))
+        }
+    }
+
+    /// Atomically validates name uniqueness and registers topic.
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_register_topic(
+        &self,
+        stream_id: StreamId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) -> Result<(TopicId, Arc<TopicStats>), IggyError> {
+        let parent_stats = self.get_stream_stats(stream_id).ok_or_else(|| {
+            IggyError::StreamIdNotFound(Identifier::numeric(stream_id as 
u32).unwrap())
+        })?;
+
+        let stats = Arc::new(TopicStats::new(parent_stats));
+
+        let name_existed = Arc::new(AtomicBool::new(false));
+        let stream_not_found = Arc::new(AtomicBool::new(false));
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let name_existed_clone = name_existed.clone();
+        let stream_not_found_clone = stream_not_found.clone();
+        let assigned_id_clone = assigned_id.clone();
+        let name_clone = name.clone();
+        let stats_clone = stats.clone();
+
+        self.inner.rcu(move |current| {
+            let Some(stream) = current.streams.get(stream_id) else {
+                stream_not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            if stream.topic_index.contains_key(&name_clone) {
+                name_existed_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            name_existed_clone.store(false, Ordering::Release);
+            stream_not_found_clone.store(false, Ordering::Release);
+
+            let mut meta = TopicMeta {
+                id: 0,
+                name: name_clone.clone(),
+                created_at,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+                stats: stats_clone.clone(),
+                partitions: SegmentedSlab::new(),
+                consumer_groups: SegmentedSlab::new(),
+                consumer_group_index: imbl::HashMap::new(),
+                partition_counter: Arc::new(AtomicUsize::new(0)),
+            };
+
+            let mut new = (**current).clone();
+            let mut updated_stream = stream.clone();
+            let (topics, id) = updated_stream.topics.insert(meta.clone());
+            // Update meta's id to match the slab-assigned ID
+            meta.id = id;
+            let (topics, _) = topics.update(id, meta);
+            assigned_id_clone.store(id, Ordering::Release);
+            updated_stream.topics = topics;
+            updated_stream.topic_index = 
updated_stream.topic_index.update(name_clone.clone(), id);
+
+            let (streams, _) = new.streams.update(stream_id, updated_stream);
+            new.streams = streams;
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        if stream_not_found.load(Ordering::Acquire) {
+            Err(IggyError::StreamIdNotFound(
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else if name_existed.load(Ordering::Acquire) {
+            Err(IggyError::TopicNameAlreadyExists(
+                name.to_string(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else {
+            Ok((assigned_id.load(Ordering::Acquire), stats))
+        }
+    }
+
+    /// Register a topic with a specific ID (for bootstrap/recovery).
+    /// Unlike try_register_topic, this uses a pre-determined topic_id.
+    #[allow(clippy::too_many_arguments)]
+    pub fn register_topic(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+        replication_factor: u8,
+    ) -> Arc<TopicStats> {
+        let parent_stats = self
+            .get_stream_stats(stream_id)
+            .expect("Stream must exist when registering topic");
+
+        let stats = Arc::new(TopicStats::new(parent_stats));
+
+        let stats_clone = stats.clone();
+        let name_clone = name.clone();
+
+        self.inner.rcu(move |current| {
+            let Some(stream) = current.streams.get(stream_id) else {
+                return Arc::clone(current);
+            };
+
+            let meta = TopicMeta {
+                id: topic_id,
+                name: name_clone.clone(),
+                created_at,
+                message_expiry,
+                compression_algorithm,
+                max_topic_size,
+                replication_factor,
+                stats: stats_clone.clone(),
+                partitions: SegmentedSlab::new(),
+                consumer_groups: SegmentedSlab::new(),
+                consumer_group_index: imbl::HashMap::new(),
+                partition_counter: Arc::new(AtomicUsize::new(0)),
+            };
+
+            let mut new = (**current).clone();
+            let mut updated_stream = stream.clone();
+            let entries: Vec<_> = updated_stream
+                .topics
+                .iter()
+                .map(|(k, v)| (k, v.clone()))
+                .chain(std::iter::once((topic_id, meta)))
+                .collect();
+            updated_stream.topics = SegmentedSlab::from_entries(entries);
+            updated_stream.topic_index = updated_stream
+                .topic_index
+                .update(name_clone.clone(), topic_id);
+
+            let (streams, _) = new.streams.update(stream_id, updated_stream);
+            new.streams = streams;
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        stats
+    }
+
+    /// Atomically validates username uniqueness and registers user.
+    pub fn try_register_user(
+        &self,
+        username: Arc<str>,
+        password_hash: Arc<str>,
+        status: iggy_common::UserStatus,
+        permissions: Option<Arc<iggy_common::Permissions>>,
+        max_users: usize,
+    ) -> Result<UserId, IggyError> {
+        let name_existed = Arc::new(AtomicBool::new(false));
+        let limit_reached = Arc::new(AtomicBool::new(false));
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let name_existed_clone = name_existed.clone();
+        let limit_reached_clone = limit_reached.clone();
+        let assigned_id_clone = assigned_id.clone();
+        let username_clone = username.clone();
+
+        self.inner.rcu(move |current| {
+            if current.user_index.contains_key(&username_clone) {
+                name_existed_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            if current.users.len() >= max_users {
+                limit_reached_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            name_existed_clone.store(false, Ordering::Release);
+            limit_reached_clone.store(false, Ordering::Release);
+
+            let mut meta = UserMeta {
+                id: 0,
+                username: username_clone.clone(),
+                password_hash: password_hash.clone(),
+                status,
+                permissions: permissions.clone(),
+                created_at: IggyTimestamp::now(),
+            };
+
+            let mut new = (**current).clone();
+            let (users, id) = new.users.insert(meta.clone());
+            // Update meta's id to match the slab-assigned ID
+            meta.id = id as UserId;
+            let (users, _) = users.update(id, meta);
+            assigned_id_clone.store(id, Ordering::Release);
+            new.users = users;
+            new.user_index = new.user_index.update(username_clone.clone(), id 
as UserId);
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        if name_existed.load(Ordering::Acquire) {
+            Err(IggyError::UserAlreadyExists)
+        } else if limit_reached.load(Ordering::Acquire) {
+            Err(IggyError::UsersLimitReached)
+        } else {
+            Ok(assigned_id.load(Ordering::Acquire) as UserId)
+        }
+    }
+
+    /// Atomically validates consumer group name uniqueness and registers the 
group.
+    pub fn try_register_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        name: Arc<str>,
+        partitions: Vec<PartitionId>,
+    ) -> Result<ConsumerGroupId, IggyError> {
+        let name_existed = Arc::new(AtomicBool::new(false));
+        let not_found = Arc::new(AtomicBool::new(false));
+        let assigned_id = Arc::new(AtomicUsize::new(0));
+        let name_existed_clone = name_existed.clone();
+        let not_found_clone = not_found.clone();
+        let assigned_id_clone = assigned_id.clone();
+        let name_clone = name.clone();
+
+        self.inner.rcu(move |current| {
+            let Some(stream) = current.streams.get(stream_id) else {
+                not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            let Some(topic) = stream.topics.get(topic_id) else {
+                not_found_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            };
+
+            if topic.consumer_group_index.contains_key(&name_clone) {
+                name_existed_clone.store(true, Ordering::Release);
+                return Arc::clone(current);
+            }
+
+            name_existed_clone.store(false, Ordering::Release);
+            not_found_clone.store(false, Ordering::Release);
+
+            let mut meta = ConsumerGroupMeta {
+                id: 0,
+                name: name_clone.clone(),
+                partitions: partitions.clone(),
+                members: SegmentedSlab::new(),
+            };
+
+            let mut new = (**current).clone();
+            let mut updated_stream = stream.clone();
+            let mut updated_topic = topic.clone();
+
+            let (groups, id) = 
updated_topic.consumer_groups.insert(meta.clone());
+            // Update meta's id to match the slab-assigned ID
+            meta.id = id;
+            let (groups, _) = groups.update(id, meta);
+            assigned_id_clone.store(id, Ordering::Release);
+            updated_topic.consumer_groups = groups;
+            updated_topic.consumer_group_index = updated_topic
+                .consumer_group_index
+                .update(name_clone.clone(), id);
+
+            let (topics, _) = updated_stream.topics.update(topic_id, 
updated_topic);
+            updated_stream.topics = topics;
+
+            let (streams, _) = new.streams.update(stream_id, updated_stream);
+            new.streams = streams;
+            new.version += 1;
+            Arc::new(new)
+        });
+
+        if not_found.load(Ordering::Acquire) {
+            Err(IggyError::TopicIdNotFound(
+                Identifier::numeric(topic_id as u32).unwrap(),
+                Identifier::numeric(stream_id as u32).unwrap(),
+            ))
+        } else if name_existed.load(Ordering::Acquire) {
+            Err(IggyError::ConsumerGroupNameAlreadyExists(
+                name.to_string(),
+                Identifier::numeric(topic_id as u32).unwrap(),
+            ))
+        } else {
+            Ok(assigned_id.load(Ordering::Acquire))
+        }
+    }
+
+    pub fn register_partitions(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        count: usize,
+        created_at: IggyTimestamp,
+    ) -> Vec<Arc<PartitionStats>> {
+        if count == 0 {
+            return Vec::new();
+        }
+
+        let parent_stats = self
+            .get_topic_stats(stream_id, topic_id)
+            .expect("Parent topic stats must exist before registering 
partitions");
+
+        let mut stats_list = Vec::with_capacity(count);
+        let mut metas = Vec::with_capacity(count);
+
+        for _ in 0..count {
+            let stats = Arc::new(PartitionStats::new(parent_stats.clone()));
+            metas.push(PartitionMeta {
+                id: 0,
+                created_at,
+                creation_version: 0,
+                stats: stats.clone(),
+            });
+            stats_list.push(stats);
+        }
+
+        self.add_partitions(stream_id, topic_id, metas);
+        stats_list
+    }
+
+    /// Register a single partition with a specific ID (for 
bootstrap/recovery).
+    pub fn register_partition(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+        created_at: IggyTimestamp,
+    ) -> Arc<PartitionStats> {
+        let parent_stats = self
+            .get_topic_stats(stream_id, topic_id)
+            .expect("Parent topic stats must exist before registering 
partition");
+
+        let stats = Arc::new(PartitionStats::new(parent_stats));
+        let meta = PartitionMeta {
+            id: partition_id,
+            created_at,
+            creation_version: 0,
+            stats: stats.clone(),
+        };
+
+        self.add_partitions_with_ids(stream_id, topic_id, vec![(partition_id, 
meta)]);
+        stats
+    }
+
+    // ═══════════════════════════════════════════════════════════════
+    // Getters for meta objects
+    // ═══════════════════════════════════════════════════════════════
+
+    pub fn get_user(&self, id: UserId) -> Option<UserMeta> {
+        self.load().users.get(id as usize).cloned()
+    }
+
+    pub fn get_all_users(&self) -> Vec<UserMeta> {
+        self.load().users.iter().map(|(_, u)| u.clone()).collect()
+    }
+
+    pub fn get_stream(&self, id: StreamId) -> Option<StreamMeta> {
+        self.load().streams.get(id).cloned()
+    }
+
+    pub fn get_topic(&self, stream_id: StreamId, topic_id: TopicId) -> 
Option<TopicMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id).cloned())
+    }
+
+    pub fn get_partition(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        partition_id: PartitionId,
+    ) -> Option<PartitionMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.partitions.get(partition_id).cloned())
+    }
+
+    pub fn get_consumer_group(
+        &self,
+        stream_id: StreamId,
+        topic_id: TopicId,
+        group_id: ConsumerGroupId,
+    ) -> Option<ConsumerGroupMeta> {
+        self.load()
+            .streams
+            .get(stream_id)
+            .and_then(|s| s.topics.get(topic_id))
+            .and_then(|t| t.consumer_groups.get(group_id).cloned())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_stream_crud() {
+        let metadata = SharedMetadata::default();
+
+        let stream_meta = StreamMeta::with_stats(
+            0,
+            Arc::from("test-stream"),
+            IggyTimestamp::now(),
+            Arc::new(StreamStats::default()),
+        );
+        let id = metadata.add_stream(stream_meta);
+
+        assert!(metadata.stream_exists(id));
+        assert_eq!(metadata.streams_count(), 1);
+
+        metadata
+            .try_update_stream(id, Arc::from("renamed-stream"))
+            .unwrap();
+        let loaded = metadata.load();
+        assert_eq!(
+            loaded.streams.get(id).unwrap().name.as_ref(),
+            "renamed-stream"
+        );
+        assert!(loaded.stream_index.contains_key("renamed-stream"));
+        assert!(!loaded.stream_index.contains_key("test-stream"));
+
+        metadata.delete_stream(id);
+        assert!(!metadata.stream_exists(id));
+        assert_eq!(metadata.streams_count(), 0);
+    }
+
+    #[test]
+    fn test_cascade_delete() {
+        let metadata = SharedMetadata::default();
+
+        // Create stream
+        let stream_stats = Arc::new(StreamStats::default());
+        let stream_meta = StreamMeta::with_stats(
+            0,
+            Arc::from("stream-1"),
+            IggyTimestamp::now(),
+            stream_stats.clone(),
+        );
+        let stream_id = metadata.add_stream(stream_meta);
+
+        // Create topic
+        let topic_stats = Arc::new(TopicStats::new(stream_stats));
+        let topic_meta = TopicMeta {
+            id: 0,
+            name: Arc::from("topic-1"),
+            created_at: IggyTimestamp::now(),
+            message_expiry: IggyExpiry::NeverExpire,
+            compression_algorithm: CompressionAlgorithm::None,
+            max_topic_size: MaxTopicSize::Unlimited,
+            replication_factor: 1,
+            stats: topic_stats.clone(),
+            partitions: SegmentedSlab::new(),
+            consumer_groups: SegmentedSlab::new(),
+            consumer_group_index: imbl::HashMap::new(),
+            partition_counter: Arc::new(AtomicUsize::new(0)),
+        };
+        let topic_id = metadata.add_topic(stream_id, topic_meta).unwrap();
+
+        // Create partitions
+        let partitions = vec![
+            PartitionMeta {
+                id: 0,
+                created_at: IggyTimestamp::now(),
+                creation_version: 0,
+                stats: Arc::new(PartitionStats::new(topic_stats.clone())),
+            },
+            PartitionMeta {
+                id: 0,
+                created_at: IggyTimestamp::now(),
+                creation_version: 0,
+                stats: Arc::new(PartitionStats::new(topic_stats)),
+            },
+        ];
+        let partition_ids = metadata.add_partitions(stream_id, topic_id, 
partitions);
+
+        assert!(metadata.stream_exists(stream_id));
+        assert!(metadata.topic_exists(stream_id, topic_id));
+        assert!(metadata.partition_exists(stream_id, topic_id, 
partition_ids[0]));
+        assert!(metadata.partition_exists(stream_id, topic_id, 
partition_ids[1]));
+
+        // Cascade delete
+        metadata.delete_stream(stream_id);
+
+        assert!(!metadata.stream_exists(stream_id));
+        assert!(!metadata.topic_exists(stream_id, topic_id));
+        assert!(!metadata.partition_exists(stream_id, topic_id, 
partition_ids[0]));
+        assert!(!metadata.partition_exists(stream_id, topic_id, 
partition_ids[1]));
+    }
+}
diff --git a/core/server/src/metadata/snapshot.rs 
b/core/server/src/metadata/snapshot.rs
new file mode 100644
index 000000000..7b96c3703
--- /dev/null
+++ b/core/server/src/metadata/snapshot.rs
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{SLAB_SEGMENT_SIZE, StreamId, StreamMeta, UserId, 
UserMeta};
+use iggy_common::PersonalAccessToken;
+use iggy_common::collections::SegmentedSlab;
+use imbl::HashMap as ImHashMap;
+use std::sync::Arc;
+
+/// Immutable metadata snapshot with hierarchical structure.
+/// Streams contain topics, topics contain partitions and consumer groups.
+/// Uses SegmentedSlab for O(1) access with structural sharing.
+#[derive(Clone, Default)]
+pub struct GlobalMetadata {
+    /// Streams indexed by StreamId (slab-assigned)
+    pub streams: SegmentedSlab<StreamMeta, SLAB_SEGMENT_SIZE>,
+    /// Users indexed by UserId (slab-assigned)
+    pub users: SegmentedSlab<UserMeta, SLAB_SEGMENT_SIZE>,
+
+    /// Forward indexes (name → ID)
+    pub stream_index: ImHashMap<Arc<str>, StreamId>,
+    pub user_index: ImHashMap<Arc<str>, UserId>,
+
+    /// user_id -> (token_hash -> PAT)
+    pub personal_access_tokens: ImHashMap<UserId, ImHashMap<Arc<str>, 
PersonalAccessToken>>,
+
+    /// Reverse index: token_hash -> user_id for O(1) PAT lookup
+    pub token_to_user: ImHashMap<Arc<str>, UserId>,
+
+    /// Monotonic version for cache invalidation
+    pub version: u64,
+}
+
+impl GlobalMetadata {
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
diff --git a/core/server/src/metadata/stream.rs 
b/core/server/src/metadata/stream.rs
new file mode 100644
index 000000000..279ffadab
--- /dev/null
+++ b/core/server/src/metadata/stream.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::topic::TopicMeta;
+use crate::metadata::{SLAB_SEGMENT_SIZE, StreamId, TopicId};
+use crate::streaming::stats::StreamStats;
+use iggy_common::IggyTimestamp;
+use iggy_common::collections::SegmentedSlab;
+use imbl::HashMap as ImHashMap;
+use std::sync::Arc;
+
+/// Stream metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct StreamMeta {
+    pub id: StreamId,
+    pub name: Arc<str>,
+    pub created_at: IggyTimestamp,
+    pub stats: Arc<StreamStats>,
+    pub topics: SegmentedSlab<TopicMeta, SLAB_SEGMENT_SIZE>,
+    pub topic_index: ImHashMap<Arc<str>, TopicId>,
+}
+
+impl StreamMeta {
+    pub fn new(id: StreamId, name: Arc<str>, created_at: IggyTimestamp) -> 
Self {
+        Self {
+            id,
+            name,
+            created_at,
+            stats: Arc::new(StreamStats::default()),
+            topics: SegmentedSlab::new(),
+            topic_index: ImHashMap::new(),
+        }
+    }
+
+    pub fn with_stats(
+        id: StreamId,
+        name: Arc<str>,
+        created_at: IggyTimestamp,
+        stats: Arc<StreamStats>,
+    ) -> Self {
+        Self {
+            id,
+            name,
+            created_at,
+            stats,
+            topics: SegmentedSlab::new(),
+            topic_index: ImHashMap::new(),
+        }
+    }
+}
diff --git a/core/server/src/metadata/topic.rs 
b/core/server/src/metadata/topic.rs
new file mode 100644
index 000000000..b9dc83a6e
--- /dev/null
+++ b/core/server/src/metadata/topic.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::consumer_group::ConsumerGroupMeta;
+use crate::metadata::partition::PartitionMeta;
+use crate::metadata::{ConsumerGroupId, SLAB_SEGMENT_SIZE, TopicId};
+use crate::streaming::stats::TopicStats;
+use iggy_common::collections::SegmentedSlab;
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+use imbl::HashMap as ImHashMap;
+use std::sync::Arc;
+use std::sync::atomic::AtomicUsize;
+
+/// Topic metadata stored in the shared snapshot.
+#[derive(Clone, Debug)]
+pub struct TopicMeta {
+    pub id: TopicId,
+    pub name: Arc<str>,
+    pub created_at: IggyTimestamp,
+    pub message_expiry: IggyExpiry,
+    pub compression_algorithm: CompressionAlgorithm,
+    pub max_topic_size: MaxTopicSize,
+    pub replication_factor: u8,
+    pub stats: Arc<TopicStats>,
+    pub partitions: SegmentedSlab<PartitionMeta, SLAB_SEGMENT_SIZE>,
+    pub consumer_groups: SegmentedSlab<ConsumerGroupMeta, SLAB_SEGMENT_SIZE>,
+    pub consumer_group_index: ImHashMap<Arc<str>, ConsumerGroupId>,
+    pub partition_counter: Arc<AtomicUsize>,
+}
+
+impl TopicMeta {
+    /// Returns the current number of partitions.
+    #[inline]
+    pub fn partitions_count(&self) -> u32 {
+        self.partitions.len() as u32
+    }
+}
diff --git a/core/server/src/metadata/user.rs b/core/server/src/metadata/user.rs
new file mode 100644
index 000000000..c82d49a44
--- /dev/null
+++ b/core/server/src/metadata/user.rs
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::UserId;
+use iggy_common::{IggyTimestamp, Permissions, UserStatus};
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct UserMeta {
+    pub id: UserId,
+    pub username: Arc<str>,
+    pub password_hash: Arc<str>,
+    pub status: UserStatus,
+    pub permissions: Option<Arc<Permissions>>,
+    pub created_at: IggyTimestamp,
+}
diff --git a/core/server/src/shard/system/personal_access_tokens.rs 
b/core/server/src/shard/system/personal_access_tokens.rs
index 7b26aff1e..9be9fb959 100644
--- a/core/server/src/shard/system/personal_access_tokens.rs
+++ b/core/server/src/shard/system/personal_access_tokens.rs
@@ -25,6 +25,7 @@ use iggy_common::IggyError;
 use iggy_common::IggyExpiry;
 use iggy_common::IggyTimestamp;
 use iggy_common::PersonalAccessToken;
+use std::sync::Arc;
 use tracing::{error, info};
 
 impl IggyShard {
@@ -102,7 +103,7 @@ impl IggyShard {
                 if user
                     .personal_access_tokens
                     .iter()
-                    .any(|pat| pat.name.as_str() == name.as_str())
+                    .any(|pat| *pat.name == *name)
                 {
                     error!(
                         "Personal access token: {name} for user with ID: 
{user_id} already exists."
@@ -150,7 +151,7 @@ impl IggyShard {
                 let token = if let Some(pat) = user
                     .personal_access_tokens
                     .iter()
-                    .find(|pat| pat.name.as_str() == name)
+                    .find(|pat| &*pat.name == name)
                 {
                     pat.token.clone()
                 } else {
@@ -178,7 +179,7 @@ impl IggyShard {
         token: &str,
         session: Option<&Session>,
     ) -> Result<User, IggyError> {
-        let token_hash = PersonalAccessToken::hash_token(token);
+        let token_hash: Arc<str> = 
Arc::from(PersonalAccessToken::hash_token(token));
         let users = self.users.values();
         let mut personal_access_token = None;
         for user in &users {
@@ -205,7 +206,7 @@ impl IggyShard {
                 personal_access_token.name, personal_access_token.user_id
             );
             return Err(IggyError::PersonalAccessTokenExpired(
-                personal_access_token.name.as_str().to_owned(),
+                (*personal_access_token.name).to_owned(),
                 personal_access_token.user_id,
             ));
         }
diff --git 
a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs 
b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
index 3c1203a9a..ce0feea6b 100644
--- a/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
+++ b/core/server/src/shard/tasks/periodic/personal_access_token_cleaner.rs
@@ -50,15 +50,15 @@ async fn clear_personal_access_tokens(shard: Rc<IggyShard>) 
-> Result<(), IggyEr
 
     let users = shard.users.values();
     for user in &users {
-        let expired_tokens: Vec<Arc<String>> = user
+        let expired_tokens: Vec<Arc<str>> = user
             .personal_access_tokens
             .iter()
             .filter(|entry| entry.value().is_expired(now))
             .map(|entry| entry.key().clone())
             .collect();
 
-        for token_hash in expired_tokens {
-            if let Some((_, pat)) = 
user.personal_access_tokens.remove(&token_hash) {
+        for token_hash in &expired_tokens {
+            if let Some((_, pat)) = 
user.personal_access_tokens.remove(token_hash) {
                 info!(
                     "Removed expired personal access token '{}' for user ID 
{}",
                     pat.name, user.id
diff --git a/core/server/src/streaming/users/user.rs 
b/core/server/src/streaming/users/user.rs
index 672662d3e..4882d4542 100644
--- a/core/server/src/streaming/users/user.rs
+++ b/core/server/src/streaming/users/user.rs
@@ -32,7 +32,7 @@ pub struct User {
     pub password: String,
     pub created_at: IggyTimestamp,
     pub permissions: Option<Permissions>,
-    pub personal_access_tokens: DashMap<Arc<String>, PersonalAccessToken>,
+    pub personal_access_tokens: DashMap<Arc<str>, PersonalAccessToken>,
 }
 
 impl Default for User {

Reply via email to