This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch arc-swap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b01b96e41461979ebc225cb854bd9d48c961bce9 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Dec 29 14:06:27 2025 +0100 metadata --- Cargo.lock | 8 +- DEPENDENCIES.md | 2 +- core/server/Cargo.toml | 1 + core/server/src/lib.rs | 1 + core/server/src/main.rs | 8 + core/server/src/metadata/consumer_group.rs | 35 ++ core/server/src/metadata/mod.rs | 38 ++ core/server/src/metadata/partition.rs | 32 ++ core/server/src/metadata/shared.rs | 836 +++++++++++++++++++++++++++++ core/server/src/metadata/snapshot.rs | 182 +++++++ core/server/src/metadata/stream.rs | 80 +++ core/server/src/metadata/topic.rs | 113 ++++ core/server/src/metadata/user.rs | 70 +++ core/server/src/shard/builder.rs | 9 + core/server/src/shard/handlers.rs | 19 +- core/server/src/shard/mod.rs | 4 + core/server/src/shard/system/streams.rs | 15 +- core/server/src/shard/system/topics.rs | 28 +- 18 files changed, 1465 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f671da6d..5df8c8f3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,9 +426,12 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e" +dependencies = [ + "rustversion", +] [[package]] name = "arcshift" @@ -8199,6 +8202,7 @@ version = "0.6.1-edge.1" dependencies = [ "ahash 0.8.12", "anyhow", + "arc-swap", "arcshift", "argon2", "async-channel", diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 8b06b77ac..72ca495bd 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -32,7 +32,7 @@ anstyle-wincon: 3.0.11, "Apache-2.0 OR MIT", anyhow: 1.0.100, "Apache-2.0 OR MIT", apache-avro: 0.17.0, "Apache-2.0", arbitrary: 1.4.2, "Apache-2.0 OR MIT", -arc-swap: 1.7.1, "Apache-2.0 OR MIT", +arc-swap: 1.8.0, "Apache-2.0 OR MIT", arcshift: 0.4.2, "Apache-2.0 OR MIT", argon2: 0.5.3, "Apache-2.0 OR MIT", array-init: 2.1.0, "Apache-2.0 OR MIT", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index bbf15794a..6e0d6d6d0 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -40,6 +40,7 @@ iggy-web = ["dep:rust-embed", "dep:mime_guess"] [dependencies] ahash = { workspace = true } anyhow = { workspace = true } +arc-swap = "1.8.0" arcshift = "0.4.2" argon2 = { workspace = true } async-channel = { workspace = true } diff --git a/core/server/src/lib.rs b/core/server/src/lib.rs index 2919214b6..41669486f 100644 --- a/core/server/src/lib.rs +++ b/core/server/src/lib.rs @@ -37,6 +37,7 @@ pub mod diagnostics; pub mod http; pub mod io; pub mod log; +pub mod metadata; pub mod quic; pub mod server_error; pub mod shard; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 58c50e33c..51b7f0566 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -34,6 +34,7 @@ use server::configs::sharding::ShardAllocator; use server::diagnostics::{print_io_uring_permission_info, print_locked_memory_limit_info}; use server::io::fs_utils; use server::log::logger::Logging; +use server::metadata::SharedMetadata; use server::server_error::ServerError; use server::shard::namespace::IggyNamespace; use server::shard::system::info::SystemInfo; @@ -321,6 +322,11 @@ fn main() -> Result<(), ServerError> { let client_manager: EternalPtr<DashMap<u32, Client>> = client_manager.into(); let client_manager = ClientManager::new(client_manager); + // Create shared metadata for all shards (ArcSwap-based lock-free reads) + let shared_metadata = Box::new(SharedMetadata::new()); + let shared_metadata = Box::leak(shared_metadata); + let shared_metadata: EternalPtr<SharedMetadata> = shared_metadata.into(); + streams.with_components(|components| { let (root, ..) = components.into_components(); for (_, stream) in root.iter() { @@ -353,6 +359,7 @@ fn main() -> Result<(), ServerError> { { let streams = streams.clone(); let shards_table = shards_table.clone(); + let shared_metadata = shared_metadata.clone(); let users = users.clone(); let connections = connections.clone(); let config = config.clone(); @@ -417,6 +424,7 @@ fn main() -> Result<(), ServerError> { .state(state) .users(users) .shards_table(shards_table) + .shared_metadata(shared_metadata) .connections(connections) .clients_manager(client_manager) .config(config) diff --git a/core/server/src/metadata/consumer_group.rs b/core/server/src/metadata/consumer_group.rs new file mode 100644 index 000000000..34c1d20b5 --- /dev/null +++ b/core/server/src/metadata/consumer_group.rs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Consumer group metadata stored in the shared snapshot. +/// Member management is dynamic and handled separately. +#[derive(Debug, Clone)] +pub struct ConsumerGroupMeta { + pub id: usize, + pub name: String, + pub partitions: Vec<usize>, +} + +impl ConsumerGroupMeta { + pub fn new(id: usize, name: String, partitions: Vec<usize>) -> Self { + Self { + id, + name, + partitions, + } + } +} diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs new file mode 100644 index 000000000..e964dccd9 --- /dev/null +++ b/core/server/src/metadata/mod.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared metadata module providing a single source of truth for all shards. +//! +//! This module replaces the broadcast-based metadata synchronization with an +//! `ArcSwap`-based approach where all shards read from a shared snapshot, +//! and only shard 0 can write (swap in new snapshots). + +mod consumer_group; +mod partition; +mod shared; +mod snapshot; +mod stream; +mod topic; +mod user; + +pub use consumer_group::ConsumerGroupMeta; +pub use partition::PartitionMeta; +pub use shared::SharedMetadata; +pub use snapshot::MetadataSnapshot; +pub use stream::StreamMeta; +pub use topic::TopicMeta; +pub use user::{PersonalAccessTokenMeta, UserMeta}; diff --git a/core/server/src/metadata/partition.rs b/core/server/src/metadata/partition.rs new file mode 100644 index 000000000..57e50dfa9 --- /dev/null +++ b/core/server/src/metadata/partition.rs @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iggy_common::IggyTimestamp; + +/// Partition metadata stored in the shared snapshot. +/// The actual partition data (segments, journals) is stored per-shard. +#[derive(Debug, Clone)] +pub struct PartitionMeta { + pub id: usize, + pub created_at: IggyTimestamp, +} + +impl PartitionMeta { + pub fn new(id: usize, created_at: IggyTimestamp) -> Self { + Self { id, created_at } + } +} diff --git a/core/server/src/metadata/shared.rs b/core/server/src/metadata/shared.rs new file mode 100644 index 000000000..f03086b65 --- /dev/null +++ b/core/server/src/metadata/shared.rs @@ -0,0 +1,836 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{ + ConsumerGroupMeta, MetadataSnapshot, PartitionMeta, StreamMeta, TopicMeta, UserMeta, +}; +use arc_swap::{ArcSwap, Guard}; +use iggy_common::{ + CompressionAlgorithm, Identifier, IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize, + Permissions, UserStatus, +}; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; + +/// Shared metadata accessible by all shards. +/// +/// Uses ArcSwap for lock-free reads and atomic updates. +/// Only shard 0 should call write methods (create/update/delete). +/// All shards can call read methods (get/exists/load). +pub struct SharedMetadata { + inner: ArcSwap<MetadataSnapshot>, + + // Atomic ID generators (only shard 0 increments these) + next_stream_id: AtomicUsize, + next_user_id: AtomicU32, +} + +impl Default for SharedMetadata { + fn default() -> Self { + Self::new() + } +} + +impl SharedMetadata { + pub fn new() -> Self { + Self { + inner: ArcSwap::from_pointee(MetadataSnapshot::new()), + next_stream_id: AtomicUsize::new(0), + next_user_id: AtomicU32::new(0), + } + } + + /// Initialize with existing state (used during server startup). + pub fn init_from_snapshot( + &self, + snapshot: MetadataSnapshot, + next_stream_id: usize, + next_user_id: u32, + ) { + self.inner.store(Arc::new(snapshot)); + self.next_stream_id.store(next_stream_id, Ordering::SeqCst); + self.next_user_id.store(next_user_id, Ordering::SeqCst); + } + + // ==================== Read Operations (any shard) ==================== + + /// Load the current snapshot. Lock-free read. + pub fn load(&self) -> Guard<Arc<MetadataSnapshot>> { + self.inner.load() + } + + /// Get a clone of the current snapshot. + pub fn snapshot(&self) -> Arc<MetadataSnapshot> { + Arc::clone(&self.inner.load()) + } + + // Stream reads + + pub fn stream_exists(&self, id: &Identifier) -> bool { + self.inner.load().stream_exists(id) + } + + pub fn stream_exists_by_name(&self, name: &str) -> bool { + self.inner.load().stream_exists_by_name(name) + } + + pub fn get_stream(&self, id: &Identifier) -> Option<StreamMeta> { + self.inner.load().get_stream(id).cloned() + } + + pub fn get_stream_id(&self, id: &Identifier) -> Option<usize> { + self.inner.load().get_stream_id(id) + } + + pub fn get_streams(&self) -> Vec<StreamMeta> { + self.inner.load().streams.values().cloned().collect() + } + + // User reads + + pub fn user_exists(&self, id: &Identifier) -> bool { + self.inner.load().user_exists(id) + } + + pub fn user_exists_by_name(&self, username: &str) -> bool { + self.inner.load().user_exists_by_name(username) + } + + pub fn get_user(&self, id: &Identifier) -> Option<UserMeta> { + self.inner.load().get_user(id).cloned() + } + + pub fn get_user_id(&self, id: &Identifier) -> Option<u32> { + self.inner.load().get_user_id(id) + } + + pub fn get_users(&self) -> Vec<UserMeta> { + self.inner.load().users.values().cloned().collect() + } + + pub fn get_user_by_username(&self, username: &str) -> Option<UserMeta> { + self.inner.load().get_user_by_username(username).cloned() + } + + // ==================== Write Operations (shard 0 only) ==================== + + // Stream operations + + /// Create a new stream. Returns the created stream metadata. + pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> { + let current = self.inner.load(); + + if current.stream_exists_by_name(&name) { + return Err(IggyError::StreamNameAlreadyExists(name)); + } + + let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst); + let stream = StreamMeta::new(id, name.clone(), IggyTimestamp::now()); + + let mut new_snapshot = (**current).clone(); + new_snapshot.stream_index.insert(name, id); + new_snapshot.streams.insert(id, stream.clone()); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(stream) + } + + /// Delete a stream. Returns the deleted stream metadata. + pub fn delete_stream(&self, id: &Identifier) -> Result<StreamMeta, IggyError> { + let current = self.inner.load(); + + let stream_id = current + .get_stream_id(id) + .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?; + + let mut new_snapshot = (**current).clone(); + let stream = new_snapshot + .streams + .remove(&stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?; + new_snapshot.stream_index.remove(&stream.name); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(stream) + } + + /// Update a stream's name. + pub fn update_stream(&self, id: &Identifier, new_name: String) -> Result<(), IggyError> { + let current = self.inner.load(); + + let stream_id = current + .get_stream_id(id) + .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?; + + let stream = current + .streams + .get(&stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(id.clone()))?; + + if stream.name == new_name { + return Ok(()); + } + + if current.stream_exists_by_name(&new_name) { + return Err(IggyError::StreamNameAlreadyExists(new_name)); + } + + let mut new_snapshot = (**current).clone(); + let old_name = new_snapshot.streams.get(&stream_id).unwrap().name.clone(); + new_snapshot.stream_index.remove(&old_name); + new_snapshot + .stream_index + .insert(new_name.clone(), stream_id); + new_snapshot.streams.get_mut(&stream_id).unwrap().name = new_name; + + self.inner.store(Arc::new(new_snapshot)); + + Ok(()) + } + + // Topic operations + + /// Create a new topic in a stream. + #[allow(clippy::too_many_arguments)] + pub fn create_topic( + &self, + stream_id: &Identifier, + name: String, + replication_factor: u8, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + ) -> Result<TopicMeta, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + if stream.topic_exists(&name) { + return Err(IggyError::TopicNameAlreadyExists(name, stream_id.clone())); + } + + // Topic IDs are sequential within a stream + let topic_id = stream.topics.len(); + let topic = TopicMeta::new( + topic_id, + name.clone(), + IggyTimestamp::now(), + replication_factor, + message_expiry, + compression_algorithm, + max_topic_size, + ); + + let mut new_snapshot = (**current).clone(); + new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .add_topic(topic.clone()); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(topic) + } + + /// Delete a topic from a stream. + pub fn delete_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + ) -> Result<TopicMeta, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&name).ok_or_else(|| { + IggyError::TopicNameNotFound(name.to_string(), stream.name.clone()) + })? + } + }; + + let mut new_snapshot = (**current).clone(); + let topic = new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .remove_topic(topic_idx) + .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), stream_id.clone()))?; + + self.inner.store(Arc::new(new_snapshot)); + + Ok(topic) + } + + /// Update topic configuration. + #[allow(clippy::too_many_arguments)] + pub fn update_topic( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: Option<String>, + message_expiry: Option<IggyExpiry>, + compression_algorithm: Option<CompressionAlgorithm>, + max_topic_size: Option<MaxTopicSize>, + replication_factor: Option<u8>, + ) -> Result<(), IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let stream_name = stream.name.clone(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let topic_name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&topic_name).ok_or_else(|| { + IggyError::TopicNameNotFound(topic_name.to_string(), stream_name.clone()) + })? + } + }; + + let mut new_snapshot = (**current).clone(); + let stream_mut = new_snapshot.streams.get_mut(&stream_idx).unwrap(); + + // Check name availability before getting mutable topic reference + if let Some(ref new_name) = name { + let old_name = stream_mut + .topics + .get(&topic_idx) + .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), stream_id.clone()))? + .name + .clone(); + if *new_name != old_name && stream_mut.topic_exists(new_name) { + return Err(IggyError::TopicNameAlreadyExists( + new_name.clone(), + stream_id.clone(), + )); + } + } + + let topic = stream_mut + .topics + .get_mut(&topic_idx) + .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), stream_id.clone()))?; + + if let Some(new_name) = name + && topic.name != new_name + { + stream_mut.topic_index.remove(&topic.name); + stream_mut.topic_index.insert(new_name.clone(), topic_idx); + topic.name = new_name; + } + if let Some(expiry) = message_expiry { + topic.message_expiry = expiry; + } + if let Some(compression) = compression_algorithm { + topic.compression_algorithm = compression; + } + if let Some(size) = max_topic_size { + topic.max_topic_size = size; + } + if let Some(factor) = replication_factor { + topic.replication_factor = factor; + } + + self.inner.store(Arc::new(new_snapshot)); + + Ok(()) + } + + // Partition operations + + /// Add partitions to a topic. + pub fn create_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + count: u32, + ) -> Result<Vec<PartitionMeta>, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&name).ok_or_else(|| { + IggyError::TopicNameNotFound(name.to_string(), stream.name.clone()) + })? + } + }; + + let topic = stream.topics.get(&topic_idx).unwrap(); + let start_id = topic.partitions.len(); + + let mut new_partitions = Vec::with_capacity(count as usize); + for i in 0..count { + let partition_id = start_id + i as usize; + new_partitions.push(PartitionMeta::new(partition_id, IggyTimestamp::now())); + } + + let mut new_snapshot = (**current).clone(); + let topic_mut = new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .topics + .get_mut(&topic_idx) + .unwrap(); + + for partition in &new_partitions { + topic_mut.add_partition(partition.clone()); + } + + self.inner.store(Arc::new(new_snapshot)); + + Ok(new_partitions) + } + + /// Delete partitions from a topic. + pub fn delete_partitions( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_ids: &[usize], + ) -> Result<Vec<PartitionMeta>, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&name).ok_or_else(|| { + IggyError::TopicNameNotFound(name.to_string(), stream.name.clone()) + })? + } + }; + + let mut new_snapshot = (**current).clone(); + let topic = new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .topics + .get_mut(&topic_idx) + .unwrap(); + + let mut deleted = Vec::with_capacity(partition_ids.len()); + for &id in partition_ids { + if let Some(partition) = topic.remove_partition(id) { + deleted.push(partition); + } + } + + self.inner.store(Arc::new(new_snapshot)); + + Ok(deleted) + } + + // User operations + + /// Create a new user. + pub fn create_user( + &self, + username: String, + password_hash: String, + status: UserStatus, + permissions: Option<Permissions>, + ) -> Result<UserMeta, IggyError> { + let current = self.inner.load(); + + if current.user_exists_by_name(&username) { + return Err(IggyError::UserAlreadyExists); + } + + let id = self.next_user_id.fetch_add(1, Ordering::SeqCst); + let user = UserMeta::new( + id, + username.clone(), + password_hash, + IggyTimestamp::now(), + status, + permissions, + ); + + let mut new_snapshot = (**current).clone(); + new_snapshot.user_index.insert(username, id); + new_snapshot.users.insert(id, user.clone()); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(user) + } + + /// Delete a user. + pub fn delete_user(&self, id: &Identifier) -> Result<UserMeta, IggyError> { + let current = self.inner.load(); + + let user_id = current + .get_user_id(id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {id}")))?; + + let mut new_snapshot = (**current).clone(); + let user = new_snapshot + .users + .remove(&user_id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {user_id}")))?; + new_snapshot.user_index.remove(&user.username); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(user) + } + + /// Update user metadata. + pub fn update_user( + &self, + id: &Identifier, + username: Option<String>, + status: Option<UserStatus>, + ) -> Result<(), IggyError> { + let current = self.inner.load(); + + let user_id = current + .get_user_id(id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {id}")))?; + + let mut new_snapshot = (**current).clone(); + let user = new_snapshot + .users + .get_mut(&user_id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {user_id}")))?; + + if let Some(new_username) = username + && user.username != new_username + { + if new_snapshot.user_index.contains_key(&new_username) { + return Err(IggyError::UserAlreadyExists); + } + new_snapshot.user_index.remove(&user.username); + new_snapshot + .user_index + .insert(new_username.clone(), user_id); + user.username = new_username; + } + if let Some(new_status) = status { + user.status = new_status; + } + + self.inner.store(Arc::new(new_snapshot)); + + Ok(()) + } + + /// Update user permissions. + pub fn update_permissions( + &self, + id: &Identifier, + permissions: Option<Permissions>, + ) -> Result<(), IggyError> { + let current = self.inner.load(); + + let user_id = current + .get_user_id(id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {id}")))?; + + let mut new_snapshot = (**current).clone(); + let user = new_snapshot + .users + .get_mut(&user_id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {user_id}")))?; + user.permissions = permissions; + + self.inner.store(Arc::new(new_snapshot)); + + Ok(()) + } + + /// Change user password. + pub fn change_password( + &self, + id: &Identifier, + new_password_hash: String, + ) -> Result<(), IggyError> { + let current = self.inner.load(); + + let user_id = current + .get_user_id(id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {id}")))?; + + let mut new_snapshot = (**current).clone(); + let user = new_snapshot + .users + .get_mut(&user_id) + .ok_or_else(|| IggyError::ResourceNotFound(format!("user with ID {user_id}")))?; + user.password_hash = new_password_hash; + + self.inner.store(Arc::new(new_snapshot)); + + Ok(()) + } + + // Consumer group operations + + /// Create a consumer group. + pub fn create_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + name: String, + partition_ids: Vec<usize>, + ) -> Result<ConsumerGroupMeta, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let topic_name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&topic_name).ok_or_else(|| { + IggyError::TopicNameNotFound(topic_name.to_string(), stream.name.clone()) + })? + } + }; + + let topic = stream.topics.get(&topic_idx).unwrap(); + if topic.get_consumer_group_id_by_name(&name).is_some() { + return Err(IggyError::ConsumerGroupNameAlreadyExists( + name, + topic_id.clone(), + )); + } + + let group_id = topic.consumer_groups.len(); + let group = ConsumerGroupMeta::new(group_id, name, partition_ids); + + let mut new_snapshot = (**current).clone(); + new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .topics + .get_mut(&topic_idx) + .unwrap() + .add_consumer_group(group.clone()); + + self.inner.store(Arc::new(new_snapshot)); + + Ok(group) + } + + /// Delete a consumer group. + pub fn delete_consumer_group( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + group_id: &Identifier, + ) -> Result<ConsumerGroupMeta, IggyError> { + let current = self.inner.load(); + + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_idx = match topic_id.kind { + iggy_common::IdKind::Numeric => topic_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let topic_name = topic_id.get_string_value().unwrap(); + stream.get_topic_id_by_name(&topic_name).ok_or_else(|| { + IggyError::TopicNameNotFound(topic_name.to_string(), stream.name.clone()) + })? + } + }; + + let topic = stream.topics.get(&topic_idx).unwrap(); + let group_idx = match group_id.kind { + iggy_common::IdKind::Numeric => group_id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::String => { + let group_name = group_id.get_string_value().unwrap(); + topic + .get_consumer_group_id_by_name(&group_name) + .ok_or_else(|| { + IggyError::ConsumerGroupNameNotFound( + group_name.to_string(), + topic_id.clone(), + ) + })? + } + }; + + let mut new_snapshot = (**current).clone(); + let group = new_snapshot + .streams + .get_mut(&stream_idx) + .unwrap() + .topics + .get_mut(&topic_idx) + .unwrap() + .remove_consumer_group(group_idx) + .ok_or_else(|| { + IggyError::ConsumerGroupIdNotFound(group_id.clone(), topic_id.clone()) + })?; + + self.inner.store(Arc::new(new_snapshot)); + + Ok(group) + } + + // Bound address operations + + /// Set the bound TCP address. + pub fn set_tcp_address(&self, address: SocketAddr) { + let current = self.inner.load(); + let mut new_snapshot = (**current).clone(); + new_snapshot.bound_addresses.tcp = Some(address); + self.inner.store(Arc::new(new_snapshot)); + } + + /// Set the bound HTTP address. + pub fn set_http_address(&self, address: SocketAddr) { + let current = self.inner.load(); + let mut new_snapshot = (**current).clone(); + new_snapshot.bound_addresses.http = Some(address); + self.inner.store(Arc::new(new_snapshot)); + } + + /// Set the bound QUIC address. + pub fn set_quic_address(&self, address: SocketAddr) { + let current = self.inner.load(); + let mut new_snapshot = (**current).clone(); + new_snapshot.bound_addresses.quic = Some(address); + self.inner.store(Arc::new(new_snapshot)); + } + + /// Set the bound WebSocket address. + pub fn set_websocket_address(&self, address: SocketAddr) { + let current = self.inner.load(); + let mut new_snapshot = (**current).clone(); + new_snapshot.bound_addresses.websocket = Some(address); + self.inner.store(Arc::new(new_snapshot)); + } + + /// Get the current next stream ID (for testing/debugging). + pub fn next_stream_id(&self) -> usize { + self.next_stream_id.load(Ordering::SeqCst) + } + + /// Get the current next user ID (for testing/debugging). + pub fn next_user_id(&self) -> u32 { + self.next_user_id.load(Ordering::SeqCst) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_stream() { + let metadata = SharedMetadata::new(); + + let stream = metadata.create_stream("test-stream".to_string()).unwrap(); + assert_eq!(stream.id, 0); + assert_eq!(stream.name, "test-stream"); + + assert!(metadata.stream_exists_by_name("test-stream")); + assert!(!metadata.stream_exists_by_name("nonexistent")); + } + + #[test] + fn test_create_duplicate_stream_fails() { + let metadata = SharedMetadata::new(); + + metadata.create_stream("test-stream".to_string()).unwrap(); + let result = metadata.create_stream("test-stream".to_string()); + + assert!(matches!(result, Err(IggyError::StreamNameAlreadyExists(_)))); + } + + #[test] + fn test_delete_stream() { + let metadata = SharedMetadata::new(); + + let stream = metadata.create_stream("test-stream".to_string()).unwrap(); + let stream_id = Identifier::numeric(stream.id as u32).unwrap(); + + let deleted = metadata.delete_stream(&stream_id).unwrap(); + assert_eq!(deleted.id, stream.id); + assert!(!metadata.stream_exists_by_name("test-stream")); + } + + #[test] + fn test_create_user() { + let metadata = SharedMetadata::new(); + + let user = metadata + .create_user( + "testuser".to_string(), + "hashed_password".to_string(), + UserStatus::Active, + None, + ) + .unwrap(); + + assert_eq!(user.id, 0); + assert_eq!(user.username, "testuser"); + assert!(metadata.user_exists_by_name("testuser")); + } + + #[test] + fn test_concurrent_stream_ids() { + let metadata = SharedMetadata::new(); + + let stream1 = metadata.create_stream("stream-1".to_string()).unwrap(); + let stream2 = metadata.create_stream("stream-2".to_string()).unwrap(); + let stream3 = metadata.create_stream("stream-3".to_string()).unwrap(); + + assert_eq!(stream1.id, 0); + assert_eq!(stream2.id, 1); + assert_eq!(stream3.id, 2); + } +} diff --git a/core/server/src/metadata/snapshot.rs b/core/server/src/metadata/snapshot.rs new file mode 100644 index 000000000..8283740a2 --- /dev/null +++ b/core/server/src/metadata/snapshot.rs @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{StreamMeta, UserMeta}; +use ahash::AHashMap; +use iggy_common::Identifier; +use std::net::SocketAddr; + +/// Bound addresses for transport protocols. +#[derive(Debug, Clone, Default)] +pub struct BoundAddresses { + pub tcp: Option<SocketAddr>, + pub http: Option<SocketAddr>, + pub quic: Option<SocketAddr>, + pub websocket: Option<SocketAddr>, +} + +/// Immutable metadata snapshot that is atomically swapped. +/// All reads see a consistent view of the entire metadata. +#[derive(Debug, Clone)] +pub struct MetadataSnapshot { + /// Stream metadata indexed by stream ID + pub streams: AHashMap<usize, StreamMeta>, + + /// Stream name to ID index + pub stream_index: AHashMap<String, usize>, + + /// User metadata indexed by user ID + pub users: AHashMap<u32, UserMeta>, + + /// Username to user ID index + pub user_index: AHashMap<String, u32>, + + /// Bound addresses for transport protocols + pub bound_addresses: BoundAddresses, +} + +impl Default for MetadataSnapshot { + fn default() -> Self { + Self::new() + } +} + +impl MetadataSnapshot { + pub fn new() -> Self { + Self { + streams: AHashMap::new(), + stream_index: AHashMap::new(), + users: AHashMap::new(), + user_index: AHashMap::new(), + bound_addresses: BoundAddresses::default(), + } + } + + // Stream operations + + pub fn stream_exists_by_name(&self, name: &str) -> bool { + self.stream_index.contains_key(name) + } + + pub fn stream_exists(&self, id: &Identifier) -> bool { + match id.kind { + iggy_common::IdKind::Numeric => { + let id = id.get_u32_value().unwrap() as usize; + self.streams.contains_key(&id) + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.stream_index.contains_key(&name) + } + } + } + + pub fn get_stream(&self, id: &Identifier) -> Option<&StreamMeta> { + match id.kind { + iggy_common::IdKind::Numeric => { + let id = id.get_u32_value().unwrap() as usize; + self.streams.get(&id) + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.stream_index + .get(&name) + .and_then(|id| self.streams.get(id)) + } + } + } + + pub fn get_stream_id(&self, id: &Identifier) -> Option<usize> { + match id.kind { + iggy_common::IdKind::Numeric => { + let stream_id = id.get_u32_value().unwrap() as usize; + if self.streams.contains_key(&stream_id) { + Some(stream_id) + } else { + None + } + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.stream_index.get(&name).copied() + } + } + } + + pub fn get_streams(&self) -> Vec<&StreamMeta> { + self.streams.values().collect() + } + + // User operations + + pub fn user_exists_by_name(&self, username: &str) -> bool { + self.user_index.contains_key(username) + } + + pub fn user_exists(&self, id: &Identifier) -> bool { + match id.kind { + iggy_common::IdKind::Numeric => { + let id = id.get_u32_value().unwrap(); + self.users.contains_key(&id) + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.user_index.contains_key(&name) + } + } + } + + pub fn get_user(&self, id: &Identifier) -> Option<&UserMeta> { + match id.kind { + iggy_common::IdKind::Numeric => { + let id = id.get_u32_value().unwrap(); + self.users.get(&id) + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.user_index.get(&name).and_then(|id| self.users.get(id)) + } + } + } + + pub fn get_user_id(&self, id: &Identifier) -> Option<u32> { + match id.kind { + iggy_common::IdKind::Numeric => { + let user_id = id.get_u32_value().unwrap(); + if self.users.contains_key(&user_id) { + Some(user_id) + } else { + None + } + } + iggy_common::IdKind::String => { + let name = id.get_string_value().unwrap(); + self.user_index.get(&name).copied() + } + } + } + + pub fn get_users(&self) -> Vec<&UserMeta> { + self.users.values().collect() + } + + pub fn get_user_by_username(&self, username: &str) -> Option<&UserMeta> { + self.user_index + .get(username) + .and_then(|id| self.users.get(id)) + } +} diff --git a/core/server/src/metadata/stream.rs b/core/server/src/metadata/stream.rs new file mode 100644 index 000000000..9b3495630 --- /dev/null +++ b/core/server/src/metadata/stream.rs @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TopicMeta; +use ahash::AHashMap; +use iggy_common::IggyTimestamp; + +/// Stream metadata stored in the shared snapshot. +#[derive(Debug, Clone)] +pub struct StreamMeta { + pub id: usize, + pub name: String, + pub created_at: IggyTimestamp, + + /// Topic metadata indexed by topic ID + pub topics: AHashMap<usize, TopicMeta>, + + /// Topic name to ID index + pub topic_index: AHashMap<String, usize>, +} + +impl StreamMeta { + pub fn new(id: usize, name: String, created_at: IggyTimestamp) -> Self { + Self { + id, + name, + created_at, + topics: AHashMap::new(), + topic_index: AHashMap::new(), + } + } + + pub fn topics_count(&self) -> usize { + self.topics.len() + } + + pub fn add_topic(&mut self, topic: TopicMeta) { + self.topic_index.insert(topic.name.clone(), topic.id); + self.topics.insert(topic.id, topic); + } + + pub fn remove_topic(&mut self, topic_id: usize) -> Option<TopicMeta> { + if let Some(topic) = self.topics.remove(&topic_id) { + self.topic_index.remove(&topic.name); + Some(topic) + } else { + None + } + } + + pub fn get_topic(&self, topic_id: usize) -> Option<&TopicMeta> { + self.topics.get(&topic_id) + } + + pub fn get_topic_mut(&mut self, topic_id: usize) -> Option<&mut TopicMeta> { + self.topics.get_mut(&topic_id) + } + + pub fn get_topic_id_by_name(&self, name: &str) -> Option<usize> { + self.topic_index.get(name).copied() + } + + pub fn topic_exists(&self, name: &str) -> bool { + self.topic_index.contains_key(name) + } +} diff --git a/core/server/src/metadata/topic.rs b/core/server/src/metadata/topic.rs new file mode 100644 index 000000000..f69b42e06 --- /dev/null +++ b/core/server/src/metadata/topic.rs @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::{ConsumerGroupMeta, PartitionMeta}; +use ahash::AHashMap; +use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; + +/// Topic metadata stored in the shared snapshot. +#[derive(Debug, Clone)] +pub struct TopicMeta { + pub id: usize, + pub name: String, + pub created_at: IggyTimestamp, + pub replication_factor: u8, + pub message_expiry: IggyExpiry, + pub compression_algorithm: CompressionAlgorithm, + pub max_topic_size: MaxTopicSize, + + /// Partition metadata indexed by partition ID + pub partitions: AHashMap<usize, PartitionMeta>, + + /// Consumer group metadata indexed by group ID + pub consumer_groups: AHashMap<usize, ConsumerGroupMeta>, + + /// Consumer group name to ID index + pub consumer_group_index: AHashMap<String, usize>, + + /// Next partition ID for round-robin assignment (not cloned, use atomic) + next_partition_id: usize, +} + +impl TopicMeta { + #[allow(clippy::too_many_arguments)] + pub fn new( + id: usize, + name: String, + created_at: IggyTimestamp, + replication_factor: u8, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + ) -> Self { + Self { + id, + name, + created_at, + replication_factor, + message_expiry, + compression_algorithm, + max_topic_size, + partitions: AHashMap::new(), + consumer_groups: AHashMap::new(), + consumer_group_index: AHashMap::new(), + next_partition_id: 0, + } + } + + pub fn partitions_count(&self) -> u32 { + self.partitions.len() as u32 + } + + pub fn add_partition(&mut self, partition: PartitionMeta) { + self.partitions.insert(partition.id, partition); + } + + pub fn remove_partition(&mut self, partition_id: usize) -> Option<PartitionMeta> { + self.partitions.remove(&partition_id) + } + + pub fn add_consumer_group(&mut self, group: ConsumerGroupMeta) { + self.consumer_group_index + .insert(group.name.clone(), group.id); + self.consumer_groups.insert(group.id, group); + } + + pub fn remove_consumer_group(&mut self, group_id: usize) -> Option<ConsumerGroupMeta> { + if let Some(group) = self.consumer_groups.remove(&group_id) { + self.consumer_group_index.remove(&group.name); + Some(group) + } else { + None + } + } + + pub fn get_consumer_group_id_by_name(&self, name: &str) -> Option<usize> { + self.consumer_group_index.get(name).copied() + } + + pub fn get_next_partition_id(&self, upperbound: usize) -> usize { + if upperbound == 0 { + return 0; + } + self.next_partition_id % upperbound + } + + pub fn increment_partition_counter(&mut self) { + self.next_partition_id = self.next_partition_id.wrapping_add(1); + } +} diff --git a/core/server/src/metadata/user.rs b/core/server/src/metadata/user.rs new file mode 100644 index 000000000..d75bd80e2 --- /dev/null +++ b/core/server/src/metadata/user.rs @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use ahash::AHashMap; +use iggy_common::{IggyTimestamp, Permissions, UserStatus}; + +/// Personal access token metadata. +#[derive(Debug, Clone)] +pub struct PersonalAccessTokenMeta { + pub name: String, + pub token_hash: String, + pub expiry_at: Option<IggyTimestamp>, +} + +impl PersonalAccessTokenMeta { + pub fn new(name: String, token_hash: String, expiry_at: Option<IggyTimestamp>) -> Self { + Self { + name, + token_hash, + expiry_at, + } + } +} + +/// User metadata stored in the shared snapshot. +#[derive(Debug, Clone)] +pub struct UserMeta { + pub id: u32, + pub username: String, + pub password_hash: String, + pub created_at: IggyTimestamp, + pub status: UserStatus, + pub permissions: Option<Permissions>, + pub personal_access_tokens: AHashMap<String, PersonalAccessTokenMeta>, +} + +impl UserMeta { + pub fn new( + id: u32, + username: String, + password_hash: String, + created_at: IggyTimestamp, + status: UserStatus, + permissions: Option<Permissions>, + ) -> Self { + Self { + id, + username, + password_hash, + created_at, + status, + permissions, + personal_access_tokens: AHashMap::new(), + } + } +} diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 188452fe8..e394b9e0c 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -22,6 +22,7 @@ use super::{ }; use crate::{ configs::server::ServerConfig, + metadata::SharedMetadata, shard::namespace::IggyNamespace, slab::{streams::Streams, users::Users}, state::file::FileState, @@ -40,6 +41,7 @@ pub struct IggyShardBuilder { id: Option<u16>, streams: Option<Streams>, shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>, + shared_metadata: Option<EternalPtr<SharedMetadata>>, state: Option<FileState>, users: Option<Users>, client_manager: Option<ClientManager>, @@ -75,6 +77,11 @@ impl IggyShardBuilder { self } + pub fn shared_metadata(mut self, shared_metadata: EternalPtr<SharedMetadata>) -> Self { + self.shared_metadata = Some(shared_metadata); + self + } + pub fn clients_manager(mut self, client_manager: ClientManager) -> Self { self.client_manager = Some(client_manager); self @@ -120,6 +127,7 @@ impl IggyShardBuilder { let id = self.id.unwrap(); let streams = self.streams.unwrap(); let shards_table = self.shards_table.unwrap(); + let shared_metadata = self.shared_metadata.unwrap(); let state = self.state.unwrap(); let users = self.users.unwrap(); let config = self.config.unwrap(); @@ -154,6 +162,7 @@ impl IggyShardBuilder { id, shards, shards_table, + shared_metadata, streams, // TODO: Fixme users, fs_locks: Default::default(), diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index 087b7a54d..81c54a98a 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -509,9 +509,10 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() } Ok(()) } - ShardEvent::CreatedStream { id, stream } => { - let stream_id = shard.create_stream_bypass_auth(stream); - assert_eq!(stream_id, id); + ShardEvent::CreatedStream { id: _, stream } => { + // Note: Slab::insert() returns the next available slot, not the stream's embedded ID. + // The stream already has the correct ID from shard 0, which is preserved after insert. + shard.create_stream_bypass_auth(stream); Ok(()) } ShardEvent::DeletedStream { id, stream_id } => { @@ -521,9 +522,9 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() Ok(()) } ShardEvent::CreatedTopic { stream_id, topic } => { - let topic_id_from_event = topic.id(); - let topic_id = shard.create_topic_bypass_auth(&stream_id, topic.clone()); - assert_eq!(topic_id, topic_id_from_event); + // Note: Slab::insert() returns the next available slot, not the topic's embedded ID. + // The topic already has the correct ID from shard 0, which is preserved after insert. + shard.create_topic_bypass_auth(&stream_id, topic.clone()); Ok(()) } ShardEvent::CreatedPartitions { @@ -570,9 +571,9 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() topic_id, cg, } => { - let cg_id = cg.id(); - let id = shard.create_consumer_group_bypass_auth(&stream_id, &topic_id, cg); - assert_eq!(id, cg_id); + // Note: Slab::insert() returns the next available slot, not the consumer group's embedded ID. + // The consumer group already has the correct ID from shard 0. + shard.create_consumer_group_bypass_auth(&stream_id, &topic_id, cg); Ok(()) } ShardEvent::DeletedConsumerGroup { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index f48279362..b5424cd9f 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -33,6 +33,7 @@ use self::tasks::{continuous, periodic}; use crate::{ configs::server::ServerConfig, io::fs_locks::FsLocks, + metadata::SharedMetadata, shard::{ namespace::IggyNamespace, task_registry::TaskRegistry, transmission::frame::ShardFrame, }, @@ -73,6 +74,9 @@ pub struct IggyShard { pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, ShardId>>, pub(crate) state: FileState, + /// Shared metadata accessible by all shards (ArcSwap-based). + pub(crate) shared_metadata: EternalPtr<SharedMetadata>, + pub(crate) fs_locks: FsLocks, pub(crate) encryptor: Option<EncryptorKind>, pub(crate) config: ServerConfig, diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index 84b82e0e4..b12ca2f2f 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -42,8 +42,12 @@ impl IggyShard { if exists { return Err(IggyError::StreamNameAlreadyExists(name)); } - let stream = stream::create_and_insert_stream_mem(&self.streams, name); + let stream = stream::create_and_insert_stream_mem(&self.streams, name.clone()); self.metrics.increment_streams(1); + + // Dual-write: also update SharedMetadata for consistent cross-shard reads + let _ = self.shared_metadata.create_stream(name); + create_stream_file_hierarchy(stream.id(), &self.config.system).await?; Ok(stream) } @@ -100,8 +104,12 @@ impl IggyShard { self.streams.with_index_mut(|index| { // Rename the key inside of hashmap let idx = index.remove(&old_name).expect("Rename key: key not found"); - index.insert(name, idx); + index.insert(name.clone(), idx); }); + + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.update_stream(id, name); + Ok(()) } @@ -147,6 +155,9 @@ impl IggyShard { let mut stream = self.delete_stream_base(id); let stream_id_usize = stream.id(); + // Dual-write: also delete from SharedMetadata + let _ = self.shared_metadata.delete_stream(id); + // Clean up consumer groups from ClientManager for this stream self.client_manager .delete_consumer_groups_for_stream(stream_id_usize); diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index 3b1c86063..a5830c8fd 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -72,7 +72,7 @@ impl IggyShard { let topic = topic::create_and_insert_topics_mem( &self.streams, stream_id, - name, + name.clone(), replication_factor.unwrap_or(1), message_expiry, compression, @@ -81,6 +81,16 @@ impl IggyShard { ); self.metrics.increment_topics(1); + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.create_topic( + stream_id, + name, + replication_factor.unwrap_or(1), + message_expiry, + compression, + max_topic_size, + ); + // Create file hierarchy for the topic. create_topic_file_hierarchy(numeric_stream_id, topic.id(), &self.config.system).await?; Ok(topic) @@ -191,9 +201,20 @@ impl IggyShard { self.streams .with_topic_by_id_mut(stream_id, topic_id, update_topic_closure); if old_name != new_name { - let rename_closure = topics::helpers::rename_index(&old_name, new_name); + let rename_closure = topics::helpers::rename_index(&old_name, new_name.clone()); self.streams.with_topics(stream_id, rename_closure); } + + // Dual-write: also update SharedMetadata + let _ = self.shared_metadata.update_topic( + stream_id, + topic_id, + Some(name), + Some(message_expiry), + Some(compression_algorithm), + Some(max_topic_size), + Some(replication_factor), + ); } pub async fn delete_topic( @@ -222,6 +243,9 @@ impl IggyShard { let mut topic = self.delete_topic_base(stream_id, topic_id); let topic_id_numeric = topic.id(); + // Dual-write: also delete from SharedMetadata + let _ = self.shared_metadata.delete_topic(stream_id, topic_id); + // Clean up consumer groups from ClientManager for this topic self.client_manager .delete_consumer_groups_for_topic(numeric_stream_id, topic_id_numeric);
