This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch arc-swap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b01b96e41461979ebc225cb854bd9d48c961bce9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 14:06:27 2025 +0100

    metadata
---
 Cargo.lock                                 |   8 +-
 DEPENDENCIES.md                            |   2 +-
 core/server/Cargo.toml                     |   1 +
 core/server/src/lib.rs                     |   1 +
 core/server/src/main.rs                    |   8 +
 core/server/src/metadata/consumer_group.rs |  35 ++
 core/server/src/metadata/mod.rs            |  38 ++
 core/server/src/metadata/partition.rs      |  32 ++
 core/server/src/metadata/shared.rs         | 836 +++++++++++++++++++++++++++++
 core/server/src/metadata/snapshot.rs       | 182 +++++++
 core/server/src/metadata/stream.rs         |  80 +++
 core/server/src/metadata/topic.rs          | 113 ++++
 core/server/src/metadata/user.rs           |  70 +++
 core/server/src/shard/builder.rs           |   9 +
 core/server/src/shard/handlers.rs          |  19 +-
 core/server/src/shard/mod.rs               |   4 +
 core/server/src/shard/system/streams.rs    |  15 +-
 core/server/src/shard/system/topics.rs     |  28 +-
 18 files changed, 1465 insertions(+), 16 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 0f671da6d..5df8c8f3c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -426,9 +426,12 @@ dependencies = [
 
 [[package]]
 name = "arc-swap"
-version = "1.7.1"
+version = "1.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e"
+dependencies = [
+ "rustversion",
+]
 
 [[package]]
 name = "arcshift"
@@ -8199,6 +8202,7 @@ version = "0.6.1-edge.1"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
+ "arc-swap",
  "arcshift",
  "argon2",
  "async-channel",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 8b06b77ac..72ca495bd 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -32,7 +32,7 @@ anstyle-wincon: 3.0.11, "Apache-2.0 OR MIT",
 anyhow: 1.0.100, "Apache-2.0 OR MIT",
 apache-avro: 0.17.0, "Apache-2.0",
 arbitrary: 1.4.2, "Apache-2.0 OR MIT",
-arc-swap: 1.7.1, "Apache-2.0 OR MIT",
+arc-swap: 1.8.0, "Apache-2.0 OR MIT",
 arcshift: 0.4.2, "Apache-2.0 OR MIT",
 argon2: 0.5.3, "Apache-2.0 OR MIT",
 array-init: 2.1.0, "Apache-2.0 OR MIT",
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index bbf15794a..6e0d6d6d0 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -40,6 +40,7 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"]
 [dependencies]
 ahash = { workspace = true }
 anyhow = { workspace = true }
+arc-swap = "1.8.0"
 arcshift = "0.4.2"
 argon2 = { workspace = true }
 async-channel = { workspace = true }
diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs
index 2919214b6..41669486f 100644
--- a/core/server/src/lib.rs
+++ b/core/server/src/lib.rs
@@ -37,6 +37,7 @@ pub mod diagnostics;
 pub mod http;
 pub mod io;
 pub mod log;
+pub mod metadata;
 pub mod quic;
 pub mod server_error;
 pub mod shard;
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 58c50e33c..51b7f0566 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -34,6 +34,7 @@ use server::configs::sharding::ShardAllocator;
 use server::diagnostics::{print_io_uring_permission_info, 
print_locked_memory_limit_info};
 use server::io::fs_utils;
 use server::log::logger::Logging;
+use server::metadata::SharedMetadata;
 use server::server_error::ServerError;
 use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
@@ -321,6 +322,11 @@ fn main() -> Result<(), ServerError> {
         let client_manager: EternalPtr<DashMap<u32, Client>> = 
client_manager.into();
         let client_manager = ClientManager::new(client_manager);
 
+        // Create shared metadata for all shards (ArcSwap-based lock-free 
reads)
+        let shared_metadata = Box::new(SharedMetadata::new());
+        let shared_metadata = Box::leak(shared_metadata);
+        let shared_metadata: EternalPtr<SharedMetadata> = 
shared_metadata.into();
+
         streams.with_components(|components| {
             let (root, ..) = components.into_components();
             for (_, stream) in root.iter() {
@@ -353,6 +359,7 @@ fn main() -> Result<(), ServerError> {
         {
             let streams = streams.clone();
             let shards_table = shards_table.clone();
+            let shared_metadata = shared_metadata.clone();
             let users = users.clone();
             let connections = connections.clone();
             let config = config.clone();
@@ -417,6 +424,7 @@ fn main() -> Result<(), ServerError> {
                                 .state(state)
                                 .users(users)
                                 .shards_table(shards_table)
+                                .shared_metadata(shared_metadata)
                                 .connections(connections)
                                 .clients_manager(client_manager)
                                 .config(config)
diff --git a/core/server/src/metadata/consumer_group.rs 
b/core/server/src/metadata/consumer_group.rs
new file mode 100644
index 000000000..34c1d20b5
--- /dev/null
+++ b/core/server/src/metadata/consumer_group.rs
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Consumer group metadata stored in the shared snapshot.
+/// Member management is dynamic and handled separately.
+#[derive(Debug, Clone)]
+pub struct ConsumerGroupMeta {
+    pub id: usize,
+    pub name: String,
+    pub partitions: Vec<usize>,
+}
+
+impl ConsumerGroupMeta {
+    pub fn new(id: usize, name: String, partitions: Vec<usize>) -> Self {
+        Self {
+            id,
+            name,
+            partitions,
+        }
+    }
+}
diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs
new file mode 100644
index 000000000..e964dccd9
--- /dev/null
+++ b/core/server/src/metadata/mod.rs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared metadata module providing a single source of truth for all shards.
+//!
+//! This module replaces the broadcast-based metadata synchronization with an
+//! `ArcSwap`-based approach where all shards read from a shared snapshot,
+//! and only shard 0 can write (swap in new snapshots).
+
+mod consumer_group;
+mod partition;
+mod shared;
+mod snapshot;
+mod stream;
+mod topic;
+mod user;
+
+pub use consumer_group::ConsumerGroupMeta;
+pub use partition::PartitionMeta;
+pub use shared::SharedMetadata;
+pub use snapshot::MetadataSnapshot;
+pub use stream::StreamMeta;
+pub use topic::TopicMeta;
+pub use user::{PersonalAccessTokenMeta, UserMeta};
diff --git a/core/server/src/metadata/partition.rs 
b/core/server/src/metadata/partition.rs
new file mode 100644
index 000000000..57e50dfa9
--- /dev/null
+++ b/core/server/src/metadata/partition.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::IggyTimestamp;
+
+/// Partition metadata stored in the shared snapshot.
+/// The actual partition data (segments, journals) is stored per-shard.
+#[derive(Debug, Clone)]
+pub struct PartitionMeta {
+    pub id: usize,
+    pub created_at: IggyTimestamp,
+}
+
+impl PartitionMeta {
+    pub fn new(id: usize, created_at: IggyTimestamp) -> Self {
+        Self { id, created_at }
+    }
+}
diff --git a/core/server/src/metadata/shared.rs 
b/core/server/src/metadata/shared.rs
new file mode 100644
index 000000000..f03086b65
--- /dev/null
+++ b/core/server/src/metadata/shared.rs
@@ -0,0 +1,836 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{
+    ConsumerGroupMeta, MetadataSnapshot, PartitionMeta, StreamMeta, TopicMeta, 
UserMeta,
+};
+use arc_swap::{ArcSwap, Guard};
+use iggy_common::{
+    CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize,
+    Permissions, UserStatus,
+};
+use std::net::SocketAddr;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
+
+/// Shared metadata accessible by all shards.
+///
+/// Uses ArcSwap for lock-free reads and atomic updates.
+/// Only shard 0 should call write methods (create/update/delete).
+/// All shards can call read methods (get/exists/load).
+pub struct SharedMetadata {
+    inner: ArcSwap<MetadataSnapshot>,
+
+    // Atomic ID generators (only shard 0 increments these)
+    next_stream_id: AtomicUsize,
+    next_user_id: AtomicU32,
+}
+
+impl Default for SharedMetadata {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SharedMetadata {
+    pub fn new() -> Self {
+        Self {
+            inner: ArcSwap::from_pointee(MetadataSnapshot::new()),
+            next_stream_id: AtomicUsize::new(0),
+            next_user_id: AtomicU32::new(0),
+        }
+    }
+
+    /// Initialize with existing state (used during server startup).
+    pub fn init_from_snapshot(
+        &self,
+        snapshot: MetadataSnapshot,
+        next_stream_id: usize,
+        next_user_id: u32,
+    ) {
+        self.inner.store(Arc::new(snapshot));
+        self.next_stream_id.store(next_stream_id, Ordering::SeqCst);
+        self.next_user_id.store(next_user_id, Ordering::SeqCst);
+    }
+
+    // ==================== Read Operations (any shard) ====================
+
+    /// Load the current snapshot. Lock-free read.
+    pub fn load(&self) -> Guard<Arc<MetadataSnapshot>> {
+        self.inner.load()
+    }
+
+    /// Get a clone of the current snapshot.
+    pub fn snapshot(&self) -> Arc<MetadataSnapshot> {
+        Arc::clone(&self.inner.load())
+    }
+
+    // Stream reads
+
+    pub fn stream_exists(&self, id: &Identifier) -> bool {
+        self.inner.load().stream_exists(id)
+    }
+
+    pub fn stream_exists_by_name(&self, name: &str) -> bool {
+        self.inner.load().stream_exists_by_name(name)
+    }
+
+    pub fn get_stream(&self, id: &Identifier) -> Option<StreamMeta> {
+        self.inner.load().get_stream(id).cloned()
+    }
+
+    pub fn get_stream_id(&self, id: &Identifier) -> Option<usize> {
+        self.inner.load().get_stream_id(id)
+    }
+
+    pub fn get_streams(&self) -> Vec<StreamMeta> {
+        self.inner.load().streams.values().cloned().collect()
+    }
+
+    // User reads
+
+    pub fn user_exists(&self, id: &Identifier) -> bool {
+        self.inner.load().user_exists(id)
+    }
+
+    pub fn user_exists_by_name(&self, username: &str) -> bool {
+        self.inner.load().user_exists_by_name(username)
+    }
+
+    pub fn get_user(&self, id: &Identifier) -> Option<UserMeta> {
+        self.inner.load().get_user(id).cloned()
+    }
+
+    pub fn get_user_id(&self, id: &Identifier) -> Option<u32> {
+        self.inner.load().get_user_id(id)
+    }
+
+    pub fn get_users(&self) -> Vec<UserMeta> {
+        self.inner.load().users.values().cloned().collect()
+    }
+
+    pub fn get_user_by_username(&self, username: &str) -> Option<UserMeta> {
+        self.inner.load().get_user_by_username(username).cloned()
+    }
+
+    // ==================== Write Operations (shard 0 only) 
====================
+
+    // Stream operations
+
+    /// Create a new stream. Returns the created stream metadata.
+    pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> 
{
+        let current = self.inner.load();
+
+        if current.stream_exists_by_name(&name) {
+            return Err(IggyError::StreamNameAlreadyExists(name));
+        }
+
+        let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst);
+        let stream = StreamMeta::new(id, name.clone(), IggyTimestamp::now());
+
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.stream_index.insert(name, id);
+        new_snapshot.streams.insert(id, stream.clone());
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(stream)
+    }
+
+    /// Delete a stream. Returns the deleted stream metadata.
+    pub fn delete_stream(&self, id: &Identifier) -> Result<StreamMeta, 
IggyError> {
+        let current = self.inner.load();
+
+        let stream_id = current
+            .get_stream_id(id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?;
+
+        let mut new_snapshot = (**current).clone();
+        let stream = new_snapshot
+            .streams
+            .remove(&stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?;
+        new_snapshot.stream_index.remove(&stream.name);
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(stream)
+    }
+
+    /// Update a stream's name.
+    pub fn update_stream(&self, id: &Identifier, new_name: String) -> 
Result<(), IggyError> {
+        let current = self.inner.load();
+
+        let stream_id = current
+            .get_stream_id(id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?;
+
+        let stream = current
+            .streams
+            .get(&stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?;
+
+        if stream.name == new_name {
+            return Ok(());
+        }
+
+        if current.stream_exists_by_name(&new_name) {
+            return Err(IggyError::StreamNameAlreadyExists(new_name));
+        }
+
+        let mut new_snapshot = (**current).clone();
+        let old_name = 
new_snapshot.streams.get(&stream_id).unwrap().name.clone();
+        new_snapshot.stream_index.remove(&old_name);
+        new_snapshot
+            .stream_index
+            .insert(new_name.clone(), stream_id);
+        new_snapshot.streams.get_mut(&stream_id).unwrap().name = new_name;
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(())
+    }
+
+    // Topic operations
+
+    /// Create a new topic in a stream.
+    #[allow(clippy::too_many_arguments)]
+    pub fn create_topic(
+        &self,
+        stream_id: &Identifier,
+        name: String,
+        replication_factor: u8,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+    ) -> Result<TopicMeta, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        if stream.topic_exists(&name) {
+            return Err(IggyError::TopicNameAlreadyExists(name, 
stream_id.clone()));
+        }
+
+        // Topic IDs are sequential within a stream
+        let topic_id = stream.topics.len();
+        let topic = TopicMeta::new(
+            topic_id,
+            name.clone(),
+            IggyTimestamp::now(),
+            replication_factor,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+        );
+
+        let mut new_snapshot = (**current).clone();
+        new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .add_topic(topic.clone());
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(topic)
+    }
+
+    /// Delete a topic from a stream.
+    pub fn delete_topic(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+    ) -> Result<TopicMeta, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(name.to_string(), 
stream.name.clone())
+                })?
+            }
+        };
+
+        let mut new_snapshot = (**current).clone();
+        let topic = new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .remove_topic(topic_idx)
+            .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), 
stream_id.clone()))?;
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(topic)
+    }
+
+    /// Update topic configuration.
+    #[allow(clippy::too_many_arguments)]
+    pub fn update_topic(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        name: Option<String>,
+        message_expiry: Option<IggyExpiry>,
+        compression_algorithm: Option<CompressionAlgorithm>,
+        max_topic_size: Option<MaxTopicSize>,
+        replication_factor: Option<u8>,
+    ) -> Result<(), IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let stream_name = stream.name.clone();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let topic_name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&topic_name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(topic_name.to_string(), 
stream_name.clone())
+                })?
+            }
+        };
+
+        let mut new_snapshot = (**current).clone();
+        let stream_mut = new_snapshot.streams.get_mut(&stream_idx).unwrap();
+
+        // Check name availability before getting mutable topic reference
+        if let Some(ref new_name) = name {
+            let old_name = stream_mut
+                .topics
+                .get(&topic_idx)
+                .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), 
stream_id.clone()))?
+                .name
+                .clone();
+            if *new_name != old_name && stream_mut.topic_exists(new_name) {
+                return Err(IggyError::TopicNameAlreadyExists(
+                    new_name.clone(),
+                    stream_id.clone(),
+                ));
+            }
+        }
+
+        let topic = stream_mut
+            .topics
+            .get_mut(&topic_idx)
+            .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), 
stream_id.clone()))?;
+
+        if let Some(new_name) = name
+            && topic.name != new_name
+        {
+            stream_mut.topic_index.remove(&topic.name);
+            stream_mut.topic_index.insert(new_name.clone(), topic_idx);
+            topic.name = new_name;
+        }
+        if let Some(expiry) = message_expiry {
+            topic.message_expiry = expiry;
+        }
+        if let Some(compression) = compression_algorithm {
+            topic.compression_algorithm = compression;
+        }
+        if let Some(size) = max_topic_size {
+            topic.max_topic_size = size;
+        }
+        if let Some(factor) = replication_factor {
+            topic.replication_factor = factor;
+        }
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(())
+    }
+
+    // Partition operations
+
+    /// Add partitions to a topic.
+    pub fn create_partitions(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        count: u32,
+    ) -> Result<Vec<PartitionMeta>, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(name.to_string(), 
stream.name.clone())
+                })?
+            }
+        };
+
+        let topic = stream.topics.get(&topic_idx).unwrap();
+        let start_id = topic.partitions.len();
+
+        let mut new_partitions = Vec::with_capacity(count as usize);
+        for i in 0..count {
+            let partition_id = start_id + i as usize;
+            new_partitions.push(PartitionMeta::new(partition_id, 
IggyTimestamp::now()));
+        }
+
+        let mut new_snapshot = (**current).clone();
+        let topic_mut = new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .topics
+            .get_mut(&topic_idx)
+            .unwrap();
+
+        for partition in &new_partitions {
+            topic_mut.add_partition(partition.clone());
+        }
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(new_partitions)
+    }
+
+    /// Delete partitions from a topic.
+    pub fn delete_partitions(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_ids: &[usize],
+    ) -> Result<Vec<PartitionMeta>, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(name.to_string(), 
stream.name.clone())
+                })?
+            }
+        };
+
+        let mut new_snapshot = (**current).clone();
+        let topic = new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .topics
+            .get_mut(&topic_idx)
+            .unwrap();
+
+        let mut deleted = Vec::with_capacity(partition_ids.len());
+        for &id in partition_ids {
+            if let Some(partition) = topic.remove_partition(id) {
+                deleted.push(partition);
+            }
+        }
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(deleted)
+    }
+
+    // User operations
+
+    /// Create a new user.
+    pub fn create_user(
+        &self,
+        username: String,
+        password_hash: String,
+        status: UserStatus,
+        permissions: Option<Permissions>,
+    ) -> Result<UserMeta, IggyError> {
+        let current = self.inner.load();
+
+        if current.user_exists_by_name(&username) {
+            return Err(IggyError::UserAlreadyExists);
+        }
+
+        let id = self.next_user_id.fetch_add(1, Ordering::SeqCst);
+        let user = UserMeta::new(
+            id,
+            username.clone(),
+            password_hash,
+            IggyTimestamp::now(),
+            status,
+            permissions,
+        );
+
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.user_index.insert(username, id);
+        new_snapshot.users.insert(id, user.clone());
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(user)
+    }
+
+    /// Delete a user.
+    pub fn delete_user(&self, id: &Identifier) -> Result<UserMeta, IggyError> {
+        let current = self.inner.load();
+
+        let user_id = current
+            .get_user_id(id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{id}")))?;
+
+        let mut new_snapshot = (**current).clone();
+        let user = new_snapshot
+            .users
+            .remove(&user_id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{user_id}")))?;
+        new_snapshot.user_index.remove(&user.username);
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(user)
+    }
+
+    /// Update user metadata.
+    pub fn update_user(
+        &self,
+        id: &Identifier,
+        username: Option<String>,
+        status: Option<UserStatus>,
+    ) -> Result<(), IggyError> {
+        let current = self.inner.load();
+
+        let user_id = current
+            .get_user_id(id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{id}")))?;
+
+        let mut new_snapshot = (**current).clone();
+        let user = new_snapshot
+            .users
+            .get_mut(&user_id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{user_id}")))?;
+
+        if let Some(new_username) = username
+            && user.username != new_username
+        {
+            if new_snapshot.user_index.contains_key(&new_username) {
+                return Err(IggyError::UserAlreadyExists);
+            }
+            new_snapshot.user_index.remove(&user.username);
+            new_snapshot
+                .user_index
+                .insert(new_username.clone(), user_id);
+            user.username = new_username;
+        }
+        if let Some(new_status) = status {
+            user.status = new_status;
+        }
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(())
+    }
+
+    /// Update user permissions.
+    pub fn update_permissions(
+        &self,
+        id: &Identifier,
+        permissions: Option<Permissions>,
+    ) -> Result<(), IggyError> {
+        let current = self.inner.load();
+
+        let user_id = current
+            .get_user_id(id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{id}")))?;
+
+        let mut new_snapshot = (**current).clone();
+        let user = new_snapshot
+            .users
+            .get_mut(&user_id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{user_id}")))?;
+        user.permissions = permissions;
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(())
+    }
+
+    /// Change user password.
+    pub fn change_password(
+        &self,
+        id: &Identifier,
+        new_password_hash: String,
+    ) -> Result<(), IggyError> {
+        let current = self.inner.load();
+
+        let user_id = current
+            .get_user_id(id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{id}")))?;
+
+        let mut new_snapshot = (**current).clone();
+        let user = new_snapshot
+            .users
+            .get_mut(&user_id)
+            .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID 
{user_id}")))?;
+        user.password_hash = new_password_hash;
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(())
+    }
+
+    // Consumer group operations
+
+    /// Create a consumer group.
+    pub fn create_consumer_group(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        name: String,
+        partition_ids: Vec<usize>,
+    ) -> Result<ConsumerGroupMeta, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let topic_name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&topic_name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(topic_name.to_string(), 
stream.name.clone())
+                })?
+            }
+        };
+
+        let topic = stream.topics.get(&topic_idx).unwrap();
+        if topic.get_consumer_group_id_by_name(&name).is_some() {
+            return Err(IggyError::ConsumerGroupNameAlreadyExists(
+                name,
+                topic_id.clone(),
+            ));
+        }
+
+        let group_id = topic.consumer_groups.len();
+        let group = ConsumerGroupMeta::new(group_id, name, partition_ids);
+
+        let mut new_snapshot = (**current).clone();
+        new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .topics
+            .get_mut(&topic_idx)
+            .unwrap()
+            .add_consumer_group(group.clone());
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(group)
+    }
+
+    /// Delete a consumer group.
+    pub fn delete_consumer_group(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        group_id: &Identifier,
+    ) -> Result<ConsumerGroupMeta, IggyError> {
+        let current = self.inner.load();
+
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_idx = match topic_id.kind {
+            iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let topic_name = topic_id.get_string_value().unwrap();
+                stream.get_topic_id_by_name(&topic_name).ok_or_else(|| {
+                    IggyError::TopicNameNotFound(topic_name.to_string(), 
stream.name.clone())
+                })?
+            }
+        };
+
+        let topic = stream.topics.get(&topic_idx).unwrap();
+        let group_idx = match group_id.kind {
+            iggy_common::IdKind::Numeric => group_id.get_u32_value().unwrap() 
as usize,
+            iggy_common::IdKind::String => {
+                let group_name = group_id.get_string_value().unwrap();
+                topic
+                    .get_consumer_group_id_by_name(&group_name)
+                    .ok_or_else(|| {
+                        IggyError::ConsumerGroupNameNotFound(
+                            group_name.to_string(),
+                            topic_id.clone(),
+                        )
+                    })?
+            }
+        };
+
+        let mut new_snapshot = (**current).clone();
+        let group = new_snapshot
+            .streams
+            .get_mut(&stream_idx)
+            .unwrap()
+            .topics
+            .get_mut(&topic_idx)
+            .unwrap()
+            .remove_consumer_group(group_idx)
+            .ok_or_else(|| {
+                IggyError::ConsumerGroupIdNotFound(group_id.clone(), 
topic_id.clone())
+            })?;
+
+        self.inner.store(Arc::new(new_snapshot));
+
+        Ok(group)
+    }
+
+    // Bound address operations
+
+    /// Set the bound TCP address.
+    pub fn set_tcp_address(&self, address: SocketAddr) {
+        let current = self.inner.load();
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.bound_addresses.tcp = Some(address);
+        self.inner.store(Arc::new(new_snapshot));
+    }
+
+    /// Set the bound HTTP address.
+    pub fn set_http_address(&self, address: SocketAddr) {
+        let current = self.inner.load();
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.bound_addresses.http = Some(address);
+        self.inner.store(Arc::new(new_snapshot));
+    }
+
+    /// Set the bound QUIC address.
+    pub fn set_quic_address(&self, address: SocketAddr) {
+        let current = self.inner.load();
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.bound_addresses.quic = Some(address);
+        self.inner.store(Arc::new(new_snapshot));
+    }
+
+    /// Set the bound WebSocket address.
+    pub fn set_websocket_address(&self, address: SocketAddr) {
+        let current = self.inner.load();
+        let mut new_snapshot = (**current).clone();
+        new_snapshot.bound_addresses.websocket = Some(address);
+        self.inner.store(Arc::new(new_snapshot));
+    }
+
+    /// Get the current next stream ID (for testing/debugging).
+    pub fn next_stream_id(&self) -> usize {
+        self.next_stream_id.load(Ordering::SeqCst)
+    }
+
+    /// Get the current next user ID (for testing/debugging).
+    pub fn next_user_id(&self) -> u32 {
+        self.next_user_id.load(Ordering::SeqCst)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_create_stream() {
+        let metadata = SharedMetadata::new();
+
+        let stream = 
metadata.create_stream("test-stream".to_string()).unwrap();
+        assert_eq!(stream.id, 0);
+        assert_eq!(stream.name, "test-stream");
+
+        assert!(metadata.stream_exists_by_name("test-stream"));
+        assert!(!metadata.stream_exists_by_name("nonexistent"));
+    }
+
+    #[test]
+    fn test_create_duplicate_stream_fails() {
+        let metadata = SharedMetadata::new();
+
+        metadata.create_stream("test-stream".to_string()).unwrap();
+        let result = metadata.create_stream("test-stream".to_string());
+
+        assert!(matches!(result, Err(IggyError::StreamNameAlreadyExists(_))));
+    }
+
+    #[test]
+    fn test_delete_stream() {
+        let metadata = SharedMetadata::new();
+
+        let stream = 
metadata.create_stream("test-stream".to_string()).unwrap();
+        let stream_id = Identifier::numeric(stream.id as u32).unwrap();
+
+        let deleted = metadata.delete_stream(&stream_id).unwrap();
+        assert_eq!(deleted.id, stream.id);
+        assert!(!metadata.stream_exists_by_name("test-stream"));
+    }
+
+    #[test]
+    fn test_create_user() {
+        let metadata = SharedMetadata::new();
+
+        let user = metadata
+            .create_user(
+                "testuser".to_string(),
+                "hashed_password".to_string(),
+                UserStatus::Active,
+                None,
+            )
+            .unwrap();
+
+        assert_eq!(user.id, 0);
+        assert_eq!(user.username, "testuser");
+        assert!(metadata.user_exists_by_name("testuser"));
+    }
+
+    #[test]
+    fn test_concurrent_stream_ids() {
+        let metadata = SharedMetadata::new();
+
+        let stream1 = metadata.create_stream("stream-1".to_string()).unwrap();
+        let stream2 = metadata.create_stream("stream-2".to_string()).unwrap();
+        let stream3 = metadata.create_stream("stream-3".to_string()).unwrap();
+
+        assert_eq!(stream1.id, 0);
+        assert_eq!(stream2.id, 1);
+        assert_eq!(stream3.id, 2);
+    }
+}
diff --git a/core/server/src/metadata/snapshot.rs 
b/core/server/src/metadata/snapshot.rs
new file mode 100644
index 000000000..8283740a2
--- /dev/null
+++ b/core/server/src/metadata/snapshot.rs
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{StreamMeta, UserMeta};
+use ahash::AHashMap;
+use iggy_common::Identifier;
+use std::net::SocketAddr;
+
+/// Bound addresses for transport protocols.
+#[derive(Debug, Clone, Default)]
+pub struct BoundAddresses {
+    pub tcp: Option<SocketAddr>,
+    pub http: Option<SocketAddr>,
+    pub quic: Option<SocketAddr>,
+    pub websocket: Option<SocketAddr>,
+}
+
+/// Immutable metadata snapshot that is atomically swapped.
+/// All reads see a consistent view of the entire metadata.
+#[derive(Debug, Clone)]
+pub struct MetadataSnapshot {
+    /// Stream metadata indexed by stream ID
+    pub streams: AHashMap<usize, StreamMeta>,
+
+    /// Stream name to ID index
+    pub stream_index: AHashMap<String, usize>,
+
+    /// User metadata indexed by user ID
+    pub users: AHashMap<u32, UserMeta>,
+
+    /// Username to user ID index
+    pub user_index: AHashMap<String, u32>,
+
+    /// Bound addresses for transport protocols
+    pub bound_addresses: BoundAddresses,
+}
+
+impl Default for MetadataSnapshot {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl MetadataSnapshot {
+    pub fn new() -> Self {
+        Self {
+            streams: AHashMap::new(),
+            stream_index: AHashMap::new(),
+            users: AHashMap::new(),
+            user_index: AHashMap::new(),
+            bound_addresses: BoundAddresses::default(),
+        }
+    }
+
+    // Stream operations
+
+    pub fn stream_exists_by_name(&self, name: &str) -> bool {
+        self.stream_index.contains_key(name)
+    }
+
+    pub fn stream_exists(&self, id: &Identifier) -> bool {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = id.get_u32_value().unwrap() as usize;
+                self.streams.contains_key(&id)
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.stream_index.contains_key(&name)
+            }
+        }
+    }
+
+    pub fn get_stream(&self, id: &Identifier) -> Option<&StreamMeta> {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = id.get_u32_value().unwrap() as usize;
+                self.streams.get(&id)
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.stream_index
+                    .get(&name)
+                    .and_then(|id| self.streams.get(id))
+            }
+        }
+    }
+
+    pub fn get_stream_id(&self, id: &Identifier) -> Option<usize> {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let stream_id = id.get_u32_value().unwrap() as usize;
+                if self.streams.contains_key(&stream_id) {
+                    Some(stream_id)
+                } else {
+                    None
+                }
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.stream_index.get(&name).copied()
+            }
+        }
+    }
+
+    pub fn get_streams(&self) -> Vec<&StreamMeta> {
+        self.streams.values().collect()
+    }
+
+    // User operations
+
+    pub fn user_exists_by_name(&self, username: &str) -> bool {
+        self.user_index.contains_key(username)
+    }
+
+    pub fn user_exists(&self, id: &Identifier) -> bool {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = id.get_u32_value().unwrap();
+                self.users.contains_key(&id)
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.user_index.contains_key(&name)
+            }
+        }
+    }
+
+    pub fn get_user(&self, id: &Identifier) -> Option<&UserMeta> {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = id.get_u32_value().unwrap();
+                self.users.get(&id)
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.user_index.get(&name).and_then(|id| self.users.get(id))
+            }
+        }
+    }
+
+    pub fn get_user_id(&self, id: &Identifier) -> Option<u32> {
+        match id.kind {
+            iggy_common::IdKind::Numeric => {
+                let user_id = id.get_u32_value().unwrap();
+                if self.users.contains_key(&user_id) {
+                    Some(user_id)
+                } else {
+                    None
+                }
+            }
+            iggy_common::IdKind::String => {
+                let name = id.get_string_value().unwrap();
+                self.user_index.get(&name).copied()
+            }
+        }
+    }
+
+    pub fn get_users(&self) -> Vec<&UserMeta> {
+        self.users.values().collect()
+    }
+
+    pub fn get_user_by_username(&self, username: &str) -> Option<&UserMeta> {
+        self.user_index
+            .get(username)
+            .and_then(|id| self.users.get(id))
+    }
+}
diff --git a/core/server/src/metadata/stream.rs 
b/core/server/src/metadata/stream.rs
new file mode 100644
index 000000000..9b3495630
--- /dev/null
+++ b/core/server/src/metadata/stream.rs
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::TopicMeta;
+use ahash::AHashMap;
+use iggy_common::IggyTimestamp;
+
+/// Stream metadata stored in the shared snapshot.
+#[derive(Debug, Clone)]
+pub struct StreamMeta {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+
+    /// Topic metadata indexed by topic ID
+    pub topics: AHashMap<usize, TopicMeta>,
+
+    /// Topic name to ID index
+    pub topic_index: AHashMap<String, usize>,
+}
+
+impl StreamMeta {
+    pub fn new(id: usize, name: String, created_at: IggyTimestamp) -> Self {
+        Self {
+            id,
+            name,
+            created_at,
+            topics: AHashMap::new(),
+            topic_index: AHashMap::new(),
+        }
+    }
+
+    pub fn topics_count(&self) -> usize {
+        self.topics.len()
+    }
+
+    pub fn add_topic(&mut self, topic: TopicMeta) {
+        self.topic_index.insert(topic.name.clone(), topic.id);
+        self.topics.insert(topic.id, topic);
+    }
+
+    pub fn remove_topic(&mut self, topic_id: usize) -> Option<TopicMeta> {
+        if let Some(topic) = self.topics.remove(&topic_id) {
+            self.topic_index.remove(&topic.name);
+            Some(topic)
+        } else {
+            None
+        }
+    }
+
+    pub fn get_topic(&self, topic_id: usize) -> Option<&TopicMeta> {
+        self.topics.get(&topic_id)
+    }
+
+    pub fn get_topic_mut(&mut self, topic_id: usize) -> Option<&mut TopicMeta> 
{
+        self.topics.get_mut(&topic_id)
+    }
+
+    pub fn get_topic_id_by_name(&self, name: &str) -> Option<usize> {
+        self.topic_index.get(name).copied()
+    }
+
+    pub fn topic_exists(&self, name: &str) -> bool {
+        self.topic_index.contains_key(name)
+    }
+}
diff --git a/core/server/src/metadata/topic.rs 
b/core/server/src/metadata/topic.rs
new file mode 100644
index 000000000..f69b42e06
--- /dev/null
+++ b/core/server/src/metadata/topic.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::metadata::{ConsumerGroupMeta, PartitionMeta};
+use ahash::AHashMap;
+use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
+
+/// Topic metadata stored in the shared snapshot.
+#[derive(Debug, Clone)]
+pub struct TopicMeta {
+    pub id: usize,
+    pub name: String,
+    pub created_at: IggyTimestamp,
+    pub replication_factor: u8,
+    pub message_expiry: IggyExpiry,
+    pub compression_algorithm: CompressionAlgorithm,
+    pub max_topic_size: MaxTopicSize,
+
+    /// Partition metadata indexed by partition ID
+    pub partitions: AHashMap<usize, PartitionMeta>,
+
+    /// Consumer group metadata indexed by group ID
+    pub consumer_groups: AHashMap<usize, ConsumerGroupMeta>,
+
+    /// Consumer group name to ID index
+    pub consumer_group_index: AHashMap<String, usize>,
+
+    /// Next partition ID for round-robin assignment (not cloned, use atomic)
+    next_partition_id: usize,
+}
+
+impl TopicMeta {
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        id: usize,
+        name: String,
+        created_at: IggyTimestamp,
+        replication_factor: u8,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+    ) -> Self {
+        Self {
+            id,
+            name,
+            created_at,
+            replication_factor,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+            partitions: AHashMap::new(),
+            consumer_groups: AHashMap::new(),
+            consumer_group_index: AHashMap::new(),
+            next_partition_id: 0,
+        }
+    }
+
+    pub fn partitions_count(&self) -> u32 {
+        self.partitions.len() as u32
+    }
+
+    pub fn add_partition(&mut self, partition: PartitionMeta) {
+        self.partitions.insert(partition.id, partition);
+    }
+
+    pub fn remove_partition(&mut self, partition_id: usize) -> 
Option<PartitionMeta> {
+        self.partitions.remove(&partition_id)
+    }
+
+    pub fn add_consumer_group(&mut self, group: ConsumerGroupMeta) {
+        self.consumer_group_index
+            .insert(group.name.clone(), group.id);
+        self.consumer_groups.insert(group.id, group);
+    }
+
+    pub fn remove_consumer_group(&mut self, group_id: usize) -> 
Option<ConsumerGroupMeta> {
+        if let Some(group) = self.consumer_groups.remove(&group_id) {
+            self.consumer_group_index.remove(&group.name);
+            Some(group)
+        } else {
+            None
+        }
+    }
+
+    pub fn get_consumer_group_id_by_name(&self, name: &str) -> Option<usize> {
+        self.consumer_group_index.get(name).copied()
+    }
+
+    pub fn get_next_partition_id(&self, upperbound: usize) -> usize {
+        if upperbound == 0 {
+            return 0;
+        }
+        self.next_partition_id % upperbound
+    }
+
+    pub fn increment_partition_counter(&mut self) {
+        self.next_partition_id = self.next_partition_id.wrapping_add(1);
+    }
+}
diff --git a/core/server/src/metadata/user.rs b/core/server/src/metadata/user.rs
new file mode 100644
index 000000000..d75bd80e2
--- /dev/null
+++ b/core/server/src/metadata/user.rs
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use ahash::AHashMap;
+use iggy_common::{IggyTimestamp, Permissions, UserStatus};
+
+/// Personal access token metadata.
+#[derive(Debug, Clone)]
+pub struct PersonalAccessTokenMeta {
+    pub name: String,
+    pub token_hash: String,
+    pub expiry_at: Option<IggyTimestamp>,
+}
+
+impl PersonalAccessTokenMeta {
+    pub fn new(name: String, token_hash: String, expiry_at: 
Option<IggyTimestamp>) -> Self {
+        Self {
+            name,
+            token_hash,
+            expiry_at,
+        }
+    }
+}
+
+/// User metadata stored in the shared snapshot.
+#[derive(Debug, Clone)]
+pub struct UserMeta {
+    pub id: u32,
+    pub username: String,
+    pub password_hash: String,
+    pub created_at: IggyTimestamp,
+    pub status: UserStatus,
+    pub permissions: Option<Permissions>,
+    pub personal_access_tokens: AHashMap<String, PersonalAccessTokenMeta>,
+}
+
+impl UserMeta {
+    pub fn new(
+        id: u32,
+        username: String,
+        password_hash: String,
+        created_at: IggyTimestamp,
+        status: UserStatus,
+        permissions: Option<Permissions>,
+    ) -> Self {
+        Self {
+            id,
+            username,
+            password_hash,
+            created_at,
+            status,
+            permissions,
+            personal_access_tokens: AHashMap::new(),
+        }
+    }
+}
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index 188452fe8..e394b9e0c 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -22,6 +22,7 @@ use super::{
 };
 use crate::{
     configs::server::ServerConfig,
+    metadata::SharedMetadata,
     shard::namespace::IggyNamespace,
     slab::{streams::Streams, users::Users},
     state::file::FileState,
@@ -40,6 +41,7 @@ pub struct IggyShardBuilder {
     id: Option<u16>,
     streams: Option<Streams>,
     shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>,
+    shared_metadata: Option<EternalPtr<SharedMetadata>>,
     state: Option<FileState>,
     users: Option<Users>,
     client_manager: Option<ClientManager>,
@@ -75,6 +77,11 @@ impl IggyShardBuilder {
         self
     }
 
+    pub fn shared_metadata(mut self, shared_metadata: 
EternalPtr<SharedMetadata>) -> Self {
+        self.shared_metadata = Some(shared_metadata);
+        self
+    }
+
     pub fn clients_manager(mut self, client_manager: ClientManager) -> Self {
         self.client_manager = Some(client_manager);
         self
@@ -120,6 +127,7 @@ impl IggyShardBuilder {
         let id = self.id.unwrap();
         let streams = self.streams.unwrap();
         let shards_table = self.shards_table.unwrap();
+        let shared_metadata = self.shared_metadata.unwrap();
         let state = self.state.unwrap();
         let users = self.users.unwrap();
         let config = self.config.unwrap();
@@ -154,6 +162,7 @@ impl IggyShardBuilder {
             id,
             shards,
             shards_table,
+            shared_metadata,
             streams, // TODO: Fixme
             users,
             fs_locks: Default::default(),
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index 087b7a54d..81c54a98a 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -509,9 +509,10 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
             }
             Ok(())
         }
-        ShardEvent::CreatedStream { id, stream } => {
-            let stream_id = shard.create_stream_bypass_auth(stream);
-            assert_eq!(stream_id, id);
+        ShardEvent::CreatedStream { id: _, stream } => {
+            // Note: Slab::insert() returns the next available slot, not the 
stream's embedded ID.
+            // The stream already has the correct ID from shard 0, which is 
preserved after insert.
+            shard.create_stream_bypass_auth(stream);
             Ok(())
         }
         ShardEvent::DeletedStream { id, stream_id } => {
@@ -521,9 +522,9 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
             Ok(())
         }
         ShardEvent::CreatedTopic { stream_id, topic } => {
-            let topic_id_from_event = topic.id();
-            let topic_id = shard.create_topic_bypass_auth(&stream_id, 
topic.clone());
-            assert_eq!(topic_id, topic_id_from_event);
+            // Note: Slab::insert() returns the next available slot, not the 
topic's embedded ID.
+            // The topic already has the correct ID from shard 0, which is 
preserved after insert.
+            shard.create_topic_bypass_auth(&stream_id, topic.clone());
             Ok(())
         }
         ShardEvent::CreatedPartitions {
@@ -570,9 +571,9 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
             topic_id,
             cg,
         } => {
-            let cg_id = cg.id();
-            let id = shard.create_consumer_group_bypass_auth(&stream_id, 
&topic_id, cg);
-            assert_eq!(id, cg_id);
+            // Note: Slab::insert() returns the next available slot, not the 
consumer group's embedded ID.
+            // The consumer group already has the correct ID from shard 0.
+            shard.create_consumer_group_bypass_auth(&stream_id, &topic_id, cg);
             Ok(())
         }
         ShardEvent::DeletedConsumerGroup {
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f48279362..b5424cd9f 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -33,6 +33,7 @@ use self::tasks::{continuous, periodic};
 use crate::{
     configs::server::ServerConfig,
     io::fs_locks::FsLocks,
+    metadata::SharedMetadata,
     shard::{
         namespace::IggyNamespace, task_registry::TaskRegistry, 
transmission::frame::ShardFrame,
     },
@@ -73,6 +74,9 @@ pub struct IggyShard {
     pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>,
     pub(crate) state: FileState,
 
+    /// Shared metadata accessible by all shards (ArcSwap-based).
+    pub(crate) shared_metadata: EternalPtr<SharedMetadata>,
+
     pub(crate) fs_locks: FsLocks,
     pub(crate) encryptor: Option<EncryptorKind>,
     pub(crate) config: ServerConfig,
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 84b82e0e4..b12ca2f2f 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -42,8 +42,12 @@ impl IggyShard {
         if exists {
             return Err(IggyError::StreamNameAlreadyExists(name));
         }
-        let stream = stream::create_and_insert_stream_mem(&self.streams, name);
+        let stream = stream::create_and_insert_stream_mem(&self.streams, 
name.clone());
         self.metrics.increment_streams(1);
+
+        // Dual-write: also update SharedMetadata for consistent cross-shard 
reads
+        let _ = self.shared_metadata.create_stream(name);
+
         create_stream_file_hierarchy(stream.id(), &self.config.system).await?;
         Ok(stream)
     }
@@ -100,8 +104,12 @@ impl IggyShard {
         self.streams.with_index_mut(|index| {
             // Rename the key inside of hashmap
             let idx = index.remove(&old_name).expect("Rename key: key not 
found");
-            index.insert(name, idx);
+            index.insert(name.clone(), idx);
         });
+
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.update_stream(id, name);
+
         Ok(())
     }
 
@@ -147,6 +155,9 @@ impl IggyShard {
         let mut stream = self.delete_stream_base(id);
         let stream_id_usize = stream.id();
 
+        // Dual-write: also delete from SharedMetadata
+        let _ = self.shared_metadata.delete_stream(id);
+
         // Clean up consumer groups from ClientManager for this stream
         self.client_manager
             .delete_consumer_groups_for_stream(stream_id_usize);
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 3b1c86063..a5830c8fd 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -72,7 +72,7 @@ impl IggyShard {
         let topic = topic::create_and_insert_topics_mem(
             &self.streams,
             stream_id,
-            name,
+            name.clone(),
             replication_factor.unwrap_or(1),
             message_expiry,
             compression,
@@ -81,6 +81,16 @@ impl IggyShard {
         );
         self.metrics.increment_topics(1);
 
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.create_topic(
+            stream_id,
+            name,
+            replication_factor.unwrap_or(1),
+            message_expiry,
+            compression,
+            max_topic_size,
+        );
+
         // Create file hierarchy for the topic.
         create_topic_file_hierarchy(numeric_stream_id, topic.id(), 
&self.config.system).await?;
         Ok(topic)
@@ -191,9 +201,20 @@ impl IggyShard {
             self.streams
                 .with_topic_by_id_mut(stream_id, topic_id, 
update_topic_closure);
         if old_name != new_name {
-            let rename_closure = topics::helpers::rename_index(&old_name, 
new_name);
+            let rename_closure = topics::helpers::rename_index(&old_name, 
new_name.clone());
             self.streams.with_topics(stream_id, rename_closure);
         }
+
+        // Dual-write: also update SharedMetadata
+        let _ = self.shared_metadata.update_topic(
+            stream_id,
+            topic_id,
+            Some(name),
+            Some(message_expiry),
+            Some(compression_algorithm),
+            Some(max_topic_size),
+            Some(replication_factor),
+        );
     }
 
     pub async fn delete_topic(
@@ -222,6 +243,9 @@ impl IggyShard {
         let mut topic = self.delete_topic_base(stream_id, topic_id);
         let topic_id_numeric = topic.id();
 
+        // Dual-write: also delete from SharedMetadata
+        let _ = self.shared_metadata.delete_topic(stream_id, topic_id);
+
         // Clean up consumer groups from ClientManager for this topic
         self.client_manager
             .delete_consumer_groups_for_topic(numeric_stream_id, 
topic_id_numeric);

Reply via email to