This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch arc-swap in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c7fe0e2c87cf20bbcdff09599f4da202872f0f0f Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Dec 29 16:36:01 2025 +0100 XD --- core/configs/server.toml | 2 +- core/server/src/http/http_server.rs | 34 +-- core/server/src/main.rs | 136 ++++++++- core/server/src/metadata/mod.rs | 2 + core/server/src/metadata/shared.rs | 62 +++- core/server/src/metadata/stats_store.rs | 108 +++++++ core/server/src/quic/quic_server.rs | 14 +- core/server/src/shard/builder.rs | 10 +- core/server/src/shard/handlers.rs | 50 ++-- core/server/src/shard/lazy_init.rs | 317 +++++++++++++++++++++ core/server/src/shard/mod.rs | 6 +- core/server/src/shard/system/partitions.rs | 9 + core/server/src/shard/system/streams.rs | 8 +- core/server/src/shard/system/topics.rs | 14 +- .../src/shard/tasks/periodic/message_saver.rs | 17 ++ core/server/src/shard/transmission/event.rs | 7 +- core/server/src/slab/partitions.rs | 104 ++++--- core/server/src/slab/streams.rs | 99 +++++-- core/server/src/slab/topics.rs | 84 ++++-- core/server/src/streaming/partitions/partition.rs | 2 +- core/server/src/streaming/topics/topic.rs | 2 +- core/server/src/tcp/tcp_listener.rs | 14 +- core/server/src/tcp/tcp_tls_listener.rs | 13 +- core/server/src/websocket/websocket_listener.rs | 26 +- .../server/src/websocket/websocket_tls_listener.rs | 26 +- 25 files changed, 952 insertions(+), 214 deletions(-) diff --git a/core/configs/server.toml b/core/configs/server.toml index cec71ddc7..c2a521b87 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -152,7 +152,7 @@ enabled = true address = "127.0.0.1:8090" # Enable TCP socket migration across shards. -socket_migration = true +socket_migration = false # Whether to use ipv4 or ipv6 ipv6 = false diff --git a/core/server/src/http/http_server.rs b/core/server/src/http/http_server.rs index 23eefbdab..6abc5fbc7 100644 --- a/core/server/src/http/http_server.rs +++ b/core/server/src/http/http_server.rs @@ -28,7 +28,6 @@ use crate::http::*; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; use crate::shard::tasks::periodic::spawn_jwt_token_cleaner; -use crate::shard::transmission::event::ShardEvent; use crate::streaming::persistence::persister::PersisterKind; use axum::extract::DefaultBodyLimit; use axum::extract::connect_info::Connected; @@ -38,7 +37,6 @@ use axum_server::tls_rustls::RustlsConfig; use compio_net::TcpListener; use err_trail::ErrContext; use iggy_common::IggyError; -use iggy_common::TransportProtocol; use std::net::SocketAddr; use std::path::PathBuf; use std::rc::Rc; @@ -142,15 +140,12 @@ pub async fn start_http_server( .expect("Failed to get local address for HTTP server"); info!("Started {api_name} on: {address}"); - // Notify shard about the bound address - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::Http, - address, - }; - - crate::shard::handlers::handle_event(&shard, event) - .await - .ok(); + // Store in Cell for config_writer backward compat + shard.http_bound_address.set(Some(address)); + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_http_address(address); + // Notify config_writer that HTTP is bound + let _ = shard.config_writer_notify.try_send(()); let service = app.into_make_service_with_connect_info::<CompioSocketAddr>(); @@ -187,17 +182,12 @@ pub async fn start_http_server( info!("Started {api_name} on: {address}"); - // Notify shard about the bound address - use crate::shard::transmission::event::ShardEvent; - use iggy_common::TransportProtocol; - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::Http, - address, - }; - - crate::shard::handlers::handle_event(&shard, event) - .await - .ok(); + // Store in Cell for config_writer backward compat + shard.http_bound_address.set(Some(address)); + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_http_address(address); + // Notify config_writer that HTTP is bound + let _ = shard.config_writer_notify.try_send(()); let service = app.into_make_service_with_connect_info::<SocketAddr>(); let handle = axum_server::Handle::new(); diff --git a/core/server/src/main.rs b/core/server/src/main.rs index 51b7f0566..9e63dbc12 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -34,15 +34,21 @@ use server::configs::sharding::ShardAllocator; use server::diagnostics::{print_io_uring_permission_info, print_locked_memory_limit_info}; use server::io::fs_utils; use server::log::logger::Logging; -use server::metadata::SharedMetadata; +use server::metadata::{ + ConsumerGroupMeta, MetadataSnapshot, PartitionMeta, SharedMetadata, SharedStatsStore, + StreamMeta, TopicMeta, UserMeta, +}; use server::server_error::ServerError; use server::shard::namespace::IggyNamespace; use server::shard::system::info::SystemInfo; use server::shard::transmission::id::ShardId; use server::shard::{IggyShard, calculate_shard_assignment}; +use server::slab::Keyed; +use server::slab::streams::Streams; use server::slab::traits_ext::{ EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents, }; +use server::slab::users::Users; use server::state::file::FileState; use server::state::system::SystemState; use server::streaming::clients::client_manager::{Client, ClientManager}; @@ -327,6 +333,14 @@ fn main() -> Result<(), ServerError> { let shared_metadata = Box::leak(shared_metadata); let shared_metadata: EternalPtr<SharedMetadata> = shared_metadata.into(); + // Create shared stats store for cross-shard stats visibility + let shared_stats = Box::new(SharedStatsStore::new()); + let shared_stats = Box::leak(shared_stats); + let shared_stats: EternalPtr<SharedStatsStore> = shared_stats.into(); + + // Populate SharedMetadata from loaded state (also registers stats in SharedStatsStore) + populate_shared_metadata(&shared_metadata, &shared_stats, &streams, &users); + streams.with_components(|components| { let (root, ..) = components.into_components(); for (_, stream) in root.iter() { @@ -360,6 +374,7 @@ fn main() -> Result<(), ServerError> { let streams = streams.clone(); let shards_table = shards_table.clone(); let shared_metadata = shared_metadata.clone(); + let shared_stats = shared_stats.clone(); let users = users.clone(); let connections = connections.clone(); let config = config.clone(); @@ -425,6 +440,7 @@ fn main() -> Result<(), ServerError> { .users(users) .shards_table(shards_table) .shared_metadata(shared_metadata) + .shared_stats(shared_stats) .connections(connections) .clients_manager(client_manager) .config(config) @@ -552,3 +568,121 @@ fn main() -> Result<(), ServerError> { Ok(()) }) } + +/// Populate SharedMetadata with existing streams and users loaded from state. +/// Also registers stats in SharedStatsStore for cross-shard visibility. +fn populate_shared_metadata( + shared_metadata: &SharedMetadata, + shared_stats: &SharedStatsStore, + streams: &Streams, + users: &Users, +) { + let mut snapshot = MetadataSnapshot::new(); + let mut max_stream_id: usize = 0; + let mut max_user_id: u32 = 0; + + // Populate streams, topics, partitions, consumer groups + streams.with_components(|components| { + let (root, stats) = components.into_components(); + for (idx, stream) in root.iter() { + max_stream_id = max_stream_id.max(stream.id()); + + // Register stream stats in SharedStatsStore + let stream_stats = Arc::clone(&stats[idx]); + shared_stats.register_stream_stats(stream.id(), stream_stats); + + let mut stream_meta = + StreamMeta::new(stream.id(), stream.name().to_string(), stream.created_at()); + + stream.topics().with_components(|topic_components| { + let (topic_roots, _topic_aux, topic_stats) = topic_components.into_components(); + for (topic_idx, topic) in topic_roots.iter() { + // Register topic stats in SharedStatsStore + let topic_stats_arc = Arc::clone(&topic_stats[topic_idx]); + shared_stats.register_topic_stats(stream.id(), topic.id(), topic_stats_arc); + + let mut topic_meta = TopicMeta::new( + topic.id(), + topic.name().to_string(), + topic.created_at(), + topic.replication_factor(), + topic.message_expiry(), + topic.compression_algorithm(), + topic.max_topic_size(), + ); + + // Add partitions + topic.partitions().with_components(|part_components| { + let (part_roots, part_stats, ..) = part_components.into_components(); + for (part_idx, partition) in part_roots.iter() { + // Register partition stats in SharedStatsStore + let part_stats_arc = Arc::clone(&part_stats[part_idx]); + shared_stats.register_partition_stats( + stream.id(), + topic.id(), + partition.id(), + part_stats_arc, + ); + + topic_meta.add_partition(PartitionMeta::new( + partition.id(), + partition.created_at(), + )); + } + }); + + // Add consumer groups + topic.consumer_groups().with_components(|cg_components| { + let (cg_roots, ..) = cg_components.into_components(); + for (_, cg) in cg_roots.iter() { + let partition_ids = cg.partitions().clone(); + topic_meta.add_consumer_group(ConsumerGroupMeta::new( + cg.id(), + cg.key().clone(), + partition_ids, + )); + } + }); + + stream_meta.add_topic(topic_meta); + } + }); + + let name = stream_meta.name.clone(); + snapshot.streams.insert(stream.id(), stream_meta); + snapshot.stream_index.insert(name, stream.id()); + } + }); + + // Populate users + for user in users.values() { + max_user_id = max_user_id.max(user.id); + + let user_meta = UserMeta::new( + user.id, + user.username.clone(), + user.password.clone(), + user.created_at, + user.status, + user.permissions.clone(), + ); + + let username = user_meta.username.clone(); + snapshot.users.insert(user.id, user_meta); + snapshot.user_index.insert(username, user.id); + } + + // Capture counts before consuming snapshot + let streams_count = snapshot.streams.len(); + let users_count = snapshot.users.len(); + let next_stream = max_stream_id.saturating_add(1); + let next_user = max_user_id.saturating_add(1); + + // Initialize with next IDs = max + 1 + shared_metadata.init_from_snapshot(snapshot, next_stream, next_user); + + info!( + "Populated SharedMetadata: {} streams, {} users (next_stream_id={}, next_user_id={})", + streams_count, users_count, next_stream, next_user, + ); +} diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs index e964dccd9..7564c1e9c 100644 --- a/core/server/src/metadata/mod.rs +++ b/core/server/src/metadata/mod.rs @@ -25,6 +25,7 @@ mod consumer_group; mod partition; mod shared; mod snapshot; +mod stats_store; mod stream; mod topic; mod user; @@ -33,6 +34,7 @@ pub use consumer_group::ConsumerGroupMeta; pub use partition::PartitionMeta; pub use shared::SharedMetadata; pub use snapshot::MetadataSnapshot; +pub use stats_store::SharedStatsStore; pub use stream::StreamMeta; pub use topic::TopicMeta; pub use user::{PersonalAccessTokenMeta, UserMeta}; diff --git a/core/server/src/metadata/shared.rs b/core/server/src/metadata/shared.rs index f03086b65..9e108c94c 100644 --- a/core/server/src/metadata/shared.rs +++ b/core/server/src/metadata/shared.rs @@ -131,16 +131,21 @@ impl SharedMetadata { // Stream operations - /// Create a new stream. Returns the created stream metadata. - pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> { + /// Add a stream with a specific ID (used by dual-write from shard 0). + /// The ID should come from the actual slab allocation. + pub fn add_stream( + &self, + id: usize, + name: String, + created_at: IggyTimestamp, + ) -> Result<StreamMeta, IggyError> { let current = self.inner.load(); if current.stream_exists_by_name(&name) { return Err(IggyError::StreamNameAlreadyExists(name)); } - let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst); - let stream = StreamMeta::new(id, name.clone(), IggyTimestamp::now()); + let stream = StreamMeta::new(id, name.clone(), created_at); let mut new_snapshot = (**current).clone(); new_snapshot.stream_index.insert(name, id); @@ -151,6 +156,13 @@ impl SharedMetadata { Ok(stream) } + /// Create a new stream with auto-generated ID. + /// Primarily for tests - production code should use `add_stream` with the actual slab ID. + pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> { + let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst); + self.add_stream(id, name, IggyTimestamp::now()) + } + /// Delete a stream. Returns the deleted stream metadata. pub fn delete_stream(&self, id: &Identifier) -> Result<StreamMeta, IggyError> { let current = self.inner.load(); @@ -207,12 +219,15 @@ impl SharedMetadata { // Topic operations - /// Create a new topic in a stream. + /// Add a topic with a specific ID (used by dual-write from shard 0). + /// The ID should come from the actual slab allocation. #[allow(clippy::too_many_arguments)] - pub fn create_topic( + pub fn add_topic( &self, stream_id: &Identifier, + topic_id: usize, name: String, + created_at: IggyTimestamp, replication_factor: u8, message_expiry: IggyExpiry, compression_algorithm: CompressionAlgorithm, @@ -229,12 +244,10 @@ impl SharedMetadata { return Err(IggyError::TopicNameAlreadyExists(name, stream_id.clone())); } - // Topic IDs are sequential within a stream - let topic_id = stream.topics.len(); let topic = TopicMeta::new( topic_id, name.clone(), - IggyTimestamp::now(), + created_at, replication_factor, message_expiry, compression_algorithm, @@ -253,6 +266,37 @@ impl SharedMetadata { Ok(topic) } + /// Create a new topic with auto-generated ID. + /// Primarily for tests - production code should use `add_topic` with the actual slab ID. + #[allow(clippy::too_many_arguments)] + pub fn create_topic( + &self, + stream_id: &Identifier, + name: String, + replication_factor: u8, + message_expiry: IggyExpiry, + compression_algorithm: CompressionAlgorithm, + max_topic_size: MaxTopicSize, + ) -> Result<TopicMeta, IggyError> { + let current = self.inner.load(); + let stream_idx = current + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + let stream = current.streams.get(&stream_idx).unwrap(); + let topic_id = stream.topics.len(); + + self.add_topic( + stream_id, + topic_id, + name, + IggyTimestamp::now(), + replication_factor, + message_expiry, + compression_algorithm, + max_topic_size, + ) + } + /// Delete a topic from a stream. pub fn delete_topic( &self, diff --git a/core/server/src/metadata/stats_store.rs b/core/server/src/metadata/stats_store.rs new file mode 100644 index 000000000..242e80bca --- /dev/null +++ b/core/server/src/metadata/stats_store.rs @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared statistics store for cross-shard stats visibility. +//! +//! When shard 0 creates a stream/topic/partition, it registers the stats Arc here. +//! When other shards do lazy init, they retrieve the same Arc so updates are visible. + +use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats}; +use dashmap::DashMap; +use std::sync::Arc; + +/// Thread-safe store for sharing stats Arcs across shards. +/// Uses DashMap for lock-free concurrent access. +#[derive(Debug, Default)] +pub struct SharedStatsStore { + /// Stream stats indexed by stream_id + stream_stats: DashMap<usize, Arc<StreamStats>>, + + /// Topic stats indexed by (stream_id, topic_id) + topic_stats: DashMap<(usize, usize), Arc<TopicStats>>, + + /// Partition stats indexed by (stream_id, topic_id, partition_id) + partition_stats: DashMap<(usize, usize, usize), Arc<PartitionStats>>, +} + +impl SharedStatsStore { + pub fn new() -> Self { + Self { + stream_stats: DashMap::new(), + topic_stats: DashMap::new(), + partition_stats: DashMap::new(), + } + } + + // Stream stats + + pub fn register_stream_stats(&self, stream_id: usize, stats: Arc<StreamStats>) { + self.stream_stats.insert(stream_id, stats); + } + + pub fn get_stream_stats(&self, stream_id: usize) -> Option<Arc<StreamStats>> { + self.stream_stats.get(&stream_id).map(|r| Arc::clone(&r)) + } + + pub fn remove_stream_stats(&self, stream_id: usize) { + self.stream_stats.remove(&stream_id); + } + + // Topic stats + + pub fn register_topic_stats(&self, stream_id: usize, topic_id: usize, stats: Arc<TopicStats>) { + self.topic_stats.insert((stream_id, topic_id), stats); + } + + pub fn get_topic_stats(&self, stream_id: usize, topic_id: usize) -> Option<Arc<TopicStats>> { + self.topic_stats + .get(&(stream_id, topic_id)) + .map(|r| Arc::clone(&r)) + } + + pub fn remove_topic_stats(&self, stream_id: usize, topic_id: usize) { + self.topic_stats.remove(&(stream_id, topic_id)); + } + + // Partition stats + + pub fn register_partition_stats( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + stats: Arc<PartitionStats>, + ) { + self.partition_stats + .insert((stream_id, topic_id, partition_id), stats); + } + + pub fn get_partition_stats( + &self, + stream_id: usize, + topic_id: usize, + partition_id: usize, + ) -> Option<Arc<PartitionStats>> { + self.partition_stats + .get(&(stream_id, topic_id, partition_id)) + .map(|r| Arc::clone(&r)) + } + + pub fn remove_partition_stats(&self, stream_id: usize, topic_id: usize, partition_id: usize) { + self.partition_stats + .remove(&(stream_id, topic_id, partition_id)); + } +} diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index d70a30877..e18b5a1c9 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -21,7 +21,6 @@ use crate::quic::{COMPONENT, listener, quic_socket}; use crate::server_error::QuicError; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; -use crate::shard::transmission::event::ShardEvent; use anyhow::Result; use compio_quic::{ Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, @@ -66,7 +65,7 @@ pub async fn spawn_quic_server( if shard.id != 0 && addr.port() == 0 { info!("Waiting for QUIC address from shard 0..."); loop { - if let Some(bound_addr) = shard.quic_bound_address.get() { + if let Some(bound_addr) = shard.shared_metadata.load().bound_addresses.quic { addr = bound_addr; info!("Received QUIC address: {}", addr); break; @@ -117,19 +116,14 @@ pub async fn spawn_quic_server( info!("Iggy QUIC server has started on: {:?}", actual_addr); if shard.id == 0 { - // Store bound address locally + // Store in Cell for config_writer backward compat shard.quic_bound_address.set(Some(actual_addr)); + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_quic_address(actual_addr); if addr.port() == 0 { // Notify config writer on shard 0 let _ = shard.config_writer_notify.try_send(()); - - // Broadcast to other shards for SO_REUSEPORT binding - let event = ShardEvent::AddressBound { - protocol: iggy_common::TransportProtocol::Quic, - address: actual_addr, - }; - shard.broadcast_event_to_all_shards(event).await?; } } else { shard.quic_bound_address.set(Some(actual_addr)); diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index e394b9e0c..0996a88b9 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -22,7 +22,7 @@ use super::{ }; use crate::{ configs::server::ServerConfig, - metadata::SharedMetadata, + metadata::{SharedMetadata, SharedStatsStore}, shard::namespace::IggyNamespace, slab::{streams::Streams, users::Users}, state::file::FileState, @@ -42,6 +42,7 @@ pub struct IggyShardBuilder { streams: Option<Streams>, shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>, shared_metadata: Option<EternalPtr<SharedMetadata>>, + shared_stats: Option<EternalPtr<SharedStatsStore>>, state: Option<FileState>, users: Option<Users>, client_manager: Option<ClientManager>, @@ -82,6 +83,11 @@ impl IggyShardBuilder { self } + pub fn shared_stats(mut self, shared_stats: EternalPtr<SharedStatsStore>) -> Self { + self.shared_stats = Some(shared_stats); + self + } + pub fn clients_manager(mut self, client_manager: ClientManager) -> Self { self.client_manager = Some(client_manager); self @@ -128,6 +134,7 @@ impl IggyShardBuilder { let streams = self.streams.unwrap(); let shards_table = self.shards_table.unwrap(); let shared_metadata = self.shared_metadata.unwrap(); + let shared_stats = self.shared_stats.unwrap(); let state = self.state.unwrap(); let users = self.users.unwrap(); let config = self.config.unwrap(); @@ -163,6 +170,7 @@ impl IggyShardBuilder { shards, shards_table, shared_metadata, + shared_stats, streams, // TODO: Fixme users, fs_locks: Default::default(), diff --git a/core/server/src/shard/handlers.rs b/core/server/src/shard/handlers.rs index c1aed5c92..23c0ea52a 100644 --- a/core/server/src/shard/handlers.rs +++ b/core/server/src/shard/handlers.rs @@ -63,6 +63,11 @@ async fn handle_request( let partition_id = request.partition_id; match request.payload { ShardRequestPayload::SendMessages { batch } => { + // Lazy init: ensure partition exists locally + shard + .ensure_local_partition(&stream_id, &topic_id, partition_id) + .await?; + let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id); let batch = shard.maybe_encrypt_messages(batch)?; let messages_count = batch.count(); @@ -74,6 +79,11 @@ async fn handle_request( Ok(ShardResponse::SendMessages) } ShardRequestPayload::PollMessages { args, consumer } => { + // Lazy init: ensure partition exists locally + shard + .ensure_local_partition(&stream_id, &topic_id, partition_id) + .await?; + let auto_commit = args.auto_commit; let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id); let (metadata, batches) = shard.streams.poll_messages(&ns, consumer, args).await?; @@ -97,12 +107,22 @@ async fn handle_request( Ok(ShardResponse::PollMessages((metadata, batches))) } ShardRequestPayload::FlushUnsavedBuffer { fsync } => { + // Lazy init: ensure partition exists locally + shard + .ensure_local_partition(&stream_id, &topic_id, partition_id) + .await?; + shard .flush_unsaved_buffer_base(&stream_id, &topic_id, partition_id, fsync) .await?; Ok(ShardResponse::FlushUnsavedBuffer) } ShardRequestPayload::DeleteSegments { segments_count } => { + // Lazy init: ensure partition exists locally + shard + .ensure_local_partition(&stream_id, &topic_id, partition_id) + .await?; + shard .delete_segments_base(&stream_id, &topic_id, partition_id, segments_count) .await?; @@ -296,6 +316,11 @@ async fn handle_request( let registry = shard.task_registry.clone(); let registry_clone = registry.clone(); + // Lazy init: ensure partition exists locally + shard + .ensure_local_partition(&stream_id, &topic_id, partition_id) + .await?; + let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id); let batch = shard.maybe_encrypt_messages(initial_data)?; let messages_count = batch.count(); @@ -358,30 +383,5 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: ShardEvent) -> Result<() .await?; Ok(()) } - ShardEvent::AddressBound { protocol, address } => { - info!( - "Received AddressBound event for {:?} with address: {}", - protocol, address - ); - match protocol { - TransportProtocol::Tcp => { - shard.tcp_bound_address.set(Some(address)); - let _ = shard.config_writer_notify.try_send(()); - } - TransportProtocol::Quic => { - shard.quic_bound_address.set(Some(address)); - let _ = shard.config_writer_notify.try_send(()); - } - TransportProtocol::Http => { - shard.http_bound_address.set(Some(address)); - let _ = shard.config_writer_notify.try_send(()); - } - TransportProtocol::WebSocket => { - shard.websocket_bound_address.set(Some(address)); - let _ = shard.config_writer_notify.try_send(()); - } - } - Ok(()) - } } } diff --git a/core/server/src/shard/lazy_init.rs b/core/server/src/shard/lazy_init.rs new file mode 100644 index 000000000..06f63b796 --- /dev/null +++ b/core/server/src/shard/lazy_init.rs @@ -0,0 +1,317 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Lazy initialization of Stream/Topic/Partition structures. +//! +//! When a non-shard-0 receives a data plane request (SendMessages, PollMessages), +//! it may not have the Stream/Topic/Partition in its local slab. This module +//! provides lazy creation from SharedMetadata. + +use crate::metadata::{PartitionMeta, StreamMeta, TopicMeta}; +use crate::shard::IggyShard; +use crate::slab::traits_ext::{EntityMarker, Insert, InsertCell}; +use crate::streaming::partitions::helpers::create_message_deduplicator; +use crate::streaming::partitions::log::SegmentedLog; +use crate::streaming::partitions::partition::{ConsumerGroupOffsets, ConsumerOffsets, Partition}; +use crate::streaming::partitions::storage::create_partition_file_hierarchy; +use crate::streaming::stats::{StreamStats, TopicStats}; +use crate::streaming::streams::stream::Stream; +use crate::streaming::topics::topic::Topic; +use iggy_common::{Identifier, IggyError}; +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use tracing::{debug, info}; + +impl IggyShard { + /// Ensures the stream/topic/partition exists locally for data operations. + /// Creates them from SharedMetadata if they don't exist. + /// + /// This is the core of lazy initialization - non-shard-0 shards create + /// data structures on-demand when they receive data plane requests. + /// + /// Accepts `Identifier` references (can be numeric or string) and resolves + /// them to numeric IDs using SharedMetadata. + pub async fn ensure_local_partition( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + ) -> Result<(), IggyError> { + // Load metadata to resolve identifiers + let metadata = self.shared_metadata.load(); + + // Resolve stream identifier to numeric ID + let numeric_stream_id = metadata + .get_stream_id(stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + // Get stream metadata and resolve topic identifier + let stream_meta = metadata + .streams + .get(&numeric_stream_id) + .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?; + + let numeric_topic_id = Self::resolve_topic_id(topic_id, stream_meta) + .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), stream_id.clone()))?; + + // Create numeric identifiers for slab lookups + let stream_ident = Identifier::numeric(numeric_stream_id as u32)?; + let topic_ident = Identifier::numeric(numeric_topic_id as u32)?; + + // Fast path: check if partition already exists + if self.streams.exists(&stream_ident) { + let topic_exists = self + .streams + .with_topics(&stream_ident, |topics| topics.exists(&topic_ident)); + + if topic_exists { + let partition_exists = + self.streams + .with_partitions(&stream_ident, &topic_ident, |partitions| { + partitions.exists(partition_id) + }); + + if partition_exists { + return Ok(()); + } + } + } + + // Slow path: need to create from metadata + debug!( + "Lazy init: stream={}, topic={}, partition={} on shard {}", + numeric_stream_id, numeric_topic_id, partition_id, self.id + ); + + let topic_meta = stream_meta + .topics + .get(&numeric_topic_id) + .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), stream_id.clone()))?; + + let partition_meta = + topic_meta + .partitions + .get(&partition_id) + .ok_or(IggyError::PartitionNotFound( + partition_id, + topic_ident.clone(), + stream_ident.clone(), + ))?; + + // Create stream if needed (and get its stats) + let stream_stats = if !self.streams.exists(&stream_ident) { + self.create_stream_from_meta(stream_meta)? + } else { + // Get existing stream's stats + self.streams + .with_stream_by_id(&stream_ident, |(_root, stats)| Arc::clone(&stats)) + }; + + // Create topic if needed (and get its stats) + let topic_exists = self + .streams + .with_topics(&stream_ident, |topics| topics.exists(&topic_ident)); + + let topic_stats = if !topic_exists { + self.create_topic_from_meta(numeric_stream_id, topic_meta, stream_stats)? + } else { + // Get existing topic's stats + self.streams + .with_topic_by_id(&stream_ident, &topic_ident, |(_root, _, stats)| { + Arc::clone(&stats) + }) + }; + + // Create partition if needed + let partition_exists = + self.streams + .with_partitions(&stream_ident, &topic_ident, |partitions| { + partitions.exists(partition_id) + }); + + if !partition_exists { + self.create_partition_from_meta( + numeric_stream_id, + numeric_topic_id, + partition_meta, + topic_stats, + ) + .await?; + } + + Ok(()) + } + + /// Resolves a topic identifier to its numeric ID. + fn resolve_topic_id(topic_id: &Identifier, stream_meta: &StreamMeta) -> Option<usize> { + match topic_id.kind { + iggy_common::IdKind::Numeric => { + let id = topic_id.get_u32_value().unwrap() as usize; + if stream_meta.topics.contains_key(&id) { + Some(id) + } else { + None + } + } + iggy_common::IdKind::String => { + let name = topic_id.get_string_value().unwrap(); + stream_meta.get_topic_id_by_name(&name) + } + } + } + + /// Creates a Stream locally from metadata. Returns the stream stats. + /// Uses SharedStatsStore to get the same Arc<StreamStats> as shard 0. + fn create_stream_from_meta(&self, meta: &StreamMeta) -> Result<Arc<StreamStats>, IggyError> { + info!( + "Lazy creating stream: id={}, name={} on shard {}", + meta.id, meta.name, self.id + ); + + // Get the shared stats from SharedStatsStore (registered by shard 0) + let stats = self.shared_stats.get_stream_stats(meta.id).ok_or_else(|| { + IggyError::StreamIdNotFound(Identifier::numeric(meta.id as u32).unwrap()) + })?; + + let mut stream = Stream::new(meta.name.clone(), Arc::clone(&stats), meta.created_at); + stream.update_id(meta.id); + + // Insert into local streams slab + // Note: slab position may differ from logical ID - that's fine, + // lookups use the name index which maps name -> slab position + let _ = self.streams.insert(stream); + + Ok(stats) + } + + /// Creates a Topic locally from metadata. Returns the topic stats. + /// Uses SharedStatsStore to get the same Arc<TopicStats> as shard 0. + fn create_topic_from_meta( + &self, + stream_id: usize, + meta: &TopicMeta, + _stream_stats: Arc<StreamStats>, + ) -> Result<Arc<TopicStats>, IggyError> { + info!( + "Lazy creating topic: stream={}, id={}, name={} on shard {}", + stream_id, meta.id, meta.name, self.id + ); + + // Get the shared stats from SharedStatsStore (registered by shard 0) + let stats = self + .shared_stats + .get_topic_stats(stream_id, meta.id) + .ok_or_else(|| { + IggyError::TopicIdNotFound( + Identifier::numeric(meta.id as u32).unwrap(), + Identifier::numeric(stream_id as u32).unwrap(), + ) + })?; + + let mut topic = Topic::new( + meta.name.clone(), + Arc::clone(&stats), + meta.created_at, + meta.replication_factor, + meta.message_expiry, + meta.compression_algorithm, + meta.max_topic_size, + ); + topic.update_id(meta.id); + + // Insert into stream's topics + // Note: slab position may differ from logical ID + let stream_ident = Identifier::numeric(stream_id as u32)?; + self.streams.with_topics(&stream_ident, |topics| { + let _ = topics.insert(topic); + }); + + Ok(stats) + } + + /// Creates a Partition locally from metadata with SegmentedLog. + /// Uses SharedStatsStore to get the same Arc<PartitionStats> as shard 0. + async fn create_partition_from_meta( + &self, + stream_id: usize, + topic_id: usize, + partition_meta: &PartitionMeta, + _topic_stats: Arc<TopicStats>, + ) -> Result<(), IggyError> { + let partition_id = partition_meta.id; + + info!( + "Lazy creating partition: stream={}, topic={}, partition={} on shard {}", + stream_id, topic_id, partition_id, self.id + ); + + // Create partition directory if needed + create_partition_file_hierarchy(stream_id, topic_id, partition_id, &self.config.system) + .await?; + + // Get the shared stats from SharedStatsStore (registered by shard 0) + let stats = self + .shared_stats + .get_partition_stats(stream_id, topic_id, partition_id) + .ok_or_else(|| { + IggyError::PartitionNotFound( + partition_id, + Identifier::numeric(topic_id as u32).unwrap(), + Identifier::numeric(stream_id as u32).unwrap(), + ) + })?; + + // Create message deduplicator if configured + let message_deduplicator = create_message_deduplicator(&self.config.system); + + // Create empty SegmentedLog + let log = SegmentedLog::default(); + + // Create partition + let partition = Partition::new( + partition_meta.created_at, + true, // should_increment_offset + stats, + message_deduplicator, + Arc::new(AtomicU64::new(0)), + Arc::new(ConsumerOffsets::with_capacity(10)), + Arc::new(ConsumerGroupOffsets::with_capacity(10)), + log, + ); + + // Insert partition into topic + let stream_ident = Identifier::numeric(stream_id as u32)?; + let topic_ident = Identifier::numeric(topic_id as u32)?; + + // Note: slab position may differ from logical ID + self.streams + .with_partitions_mut(&stream_ident, &topic_ident, |partitions| { + let _ = partitions.insert(partition); + }); + + // Initialize the log with a segment + self.init_log(&stream_ident, &topic_ident, partition_id) + .await?; + + info!( + "Lazy created partition: stream={}, topic={}, partition={} on shard {}", + stream_id, topic_id, partition_id, self.id + ); + + Ok(()) + } +} diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index b5424cd9f..bb2e0266e 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -25,6 +25,7 @@ pub mod transmission; mod communication; pub mod handlers; +mod lazy_init; // Re-export for backwards compatibility pub use communication::calculate_shard_assignment; @@ -33,7 +34,7 @@ use self::tasks::{continuous, periodic}; use crate::{ configs::server::ServerConfig, io::fs_locks::FsLocks, - metadata::SharedMetadata, + metadata::{SharedMetadata, SharedStatsStore}, shard::{ namespace::IggyNamespace, task_registry::TaskRegistry, transmission::frame::ShardFrame, }, @@ -77,6 +78,9 @@ pub struct IggyShard { /// Shared metadata accessible by all shards (ArcSwap-based). pub(crate) shared_metadata: EternalPtr<SharedMetadata>, + /// Shared stats store for cross-shard stats visibility. + pub(crate) shared_stats: EternalPtr<SharedStatsStore>, + pub(crate) fs_locks: FsLocks, pub(crate) encryptor: Option<EncryptorKind>, pub(crate) config: ServerConfig, diff --git a/core/server/src/shard/system/partitions.rs b/core/server/src/shard/system/partitions.rs index 10177bab3..b65ba443d 100644 --- a/core/server/src/shard/system/partitions.rs +++ b/core/server/src/shard/system/partitions.rs @@ -35,6 +35,7 @@ use crate::streaming::topics; use err_trail::ErrContext; use iggy_common::Identifier; use iggy_common::IggyError; +use std::sync::Arc; use tracing::info; impl IggyShard { @@ -107,6 +108,14 @@ impl IggyShard { let shards_count = self.get_available_shards_count(); for (partition_id, stats) in partitions.iter().map(|p| (p.id(), p.stats())) { + // Register partition stats in SharedStatsStore for cross-shard visibility + self.shared_stats.register_partition_stats( + numeric_stream_id, + numeric_topic_id, + partition_id, + Arc::clone(stats), + ); + let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, partition_id); let shard_id = ShardId::new(calculate_shard_assignment(&ns, shards_count)); let is_current_shard = self.id == *shard_id; diff --git a/core/server/src/shard/system/streams.rs b/core/server/src/shard/system/streams.rs index b12ca2f2f..4cb9bc041 100644 --- a/core/server/src/shard/system/streams.rs +++ b/core/server/src/shard/system/streams.rs @@ -45,8 +45,14 @@ impl IggyShard { let stream = stream::create_and_insert_stream_mem(&self.streams, name.clone()); self.metrics.increment_streams(1); + // Register stats in SharedStatsStore for cross-shard visibility + self.shared_stats + .register_stream_stats(stream.id(), stream.stats().clone()); + // Dual-write: also update SharedMetadata for consistent cross-shard reads - let _ = self.shared_metadata.create_stream(name); + let _ = self + .shared_metadata + .add_stream(stream.id(), name, stream.root().created_at()); create_stream_file_hierarchy(stream.id(), &self.config.system).await?; Ok(stream) diff --git a/core/server/src/shard/system/topics.rs b/core/server/src/shard/system/topics.rs index a5830c8fd..83985c279 100644 --- a/core/server/src/shard/system/topics.rs +++ b/core/server/src/shard/system/topics.rs @@ -26,6 +26,7 @@ use crate::streaming::{partitions, streams, topics}; use err_trail::ErrContext; use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize}; use std::str::FromStr; +use std::sync::Arc; use tracing::info; impl IggyShard { @@ -81,10 +82,19 @@ impl IggyShard { ); self.metrics.increment_topics(1); - // Dual-write: also update SharedMetadata - let _ = self.shared_metadata.create_topic( + // Register stats in SharedStatsStore for cross-shard visibility + self.shared_stats.register_topic_stats( + numeric_stream_id, + topic.id(), + Arc::clone(topic.stats()), + ); + + // Dual-write: also update SharedMetadata with actual topic ID from slab + let _ = self.shared_metadata.add_topic( stream_id, + topic.id(), name, + topic.root().created_at(), replication_factor.unwrap_or(1), message_expiry, compression, diff --git a/core/server/src/shard/tasks/periodic/message_saver.rs b/core/server/src/shard/tasks/periodic/message_saver.rs index c6d0efdcf..25ab6b53c 100644 --- a/core/server/src/shard/tasks/periodic/message_saver.rs +++ b/core/server/src/shard/tasks/periodic/message_saver.rs @@ -54,6 +54,15 @@ async fn save_messages(shard: Rc<IggyShard>) -> Result<(), IggyError> { let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap(); let partition_id = ns.partition_id(); + // Skip partitions that haven't been lazily created yet + // (they have no messages to save anyway) + if !shard + .streams + .partition_exists(&stream_id, &topic_id, partition_id) + { + continue; + } + match shard .streams .persist_messages( @@ -102,6 +111,14 @@ async fn fsync_all_segments_on_shutdown(shard: Rc<IggyShard>, result: Result<(), let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap(); let partition_id = ns.partition_id(); + // Skip partitions that haven't been lazily created yet + if !shard + .streams + .partition_exists(&stream_id, &topic_id, partition_id) + { + continue; + } + match shard .streams .fsync_all_messages(&stream_id, &topic_id, partition_id) diff --git a/core/server/src/shard/transmission/event.rs b/core/server/src/shard/transmission/event.rs index 3364a55aa..9c362d9a7 100644 --- a/core/server/src/shard/transmission/event.rs +++ b/core/server/src/shard/transmission/event.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use iggy_common::{Identifier, TransportProtocol}; -use std::net::SocketAddr; +use iggy_common::Identifier; use strum::Display; #[derive(Debug, Clone, Display)] @@ -28,8 +27,4 @@ pub enum ShardEvent { partition_id: usize, fsync: bool, }, - AddressBound { - protocol: TransportProtocol, - address: SocketAddr, - }, } diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index 0ad5abbf0..a8b990ec6 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -42,6 +42,8 @@ pub type ContainerId = usize; #[derive(Debug)] pub struct Partitions { + /// Numeric ID index: logical partition ID → slab position + numeric_index: ahash::AHashMap<usize, ContainerId>, root: Slab<partition::PartitionRoot>, stats: Slab<Arc<PartitionStats>>, message_deduplicator: Slab<Option<Arc<MessageDeduplicator>>>, @@ -60,6 +62,7 @@ pub struct Partitions { impl Clone for Partitions { fn clone(&self) -> Self { Self { + numeric_index: self.numeric_index.clone(), root: self.root.clone(), stats: self.stats.clone(), message_deduplicator: self.message_deduplicator.clone(), @@ -79,40 +82,51 @@ impl Insert for Partitions { let (root, stats, deduplicator, offset, consumer_offset, consumer_group_offset, log) = item.into_components(); - let entity_id = self.root.insert(root); - let id = self.stats.insert(stats); + // Check if ID was pre-set (e.g., from metadata during lazy creation) + let pre_set_id = root.id(); + + let slab_pos = self.root.insert(root); + let stats_pos = self.stats.insert(stats); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating stats" + slab_pos, stats_pos, + "partition_insert: position mismatch when creating stats" ); - let id = self.log.insert(log); + let log_pos = self.log.insert(log); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating log" + slab_pos, log_pos, + "partition_insert: position mismatch when creating log" ); - let id = self.message_deduplicator.insert(deduplicator); + let dedup_pos = self.message_deduplicator.insert(deduplicator); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating message_deduplicator" + slab_pos, dedup_pos, + "partition_insert: position mismatch when creating message_deduplicator" ); - let id = self.offset.insert(offset); + let offset_pos = self.offset.insert(offset); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating offset" + slab_pos, offset_pos, + "partition_insert: position mismatch when creating offset" ); - let id = self.consumer_offset.insert(consumer_offset); + let consumer_pos = self.consumer_offset.insert(consumer_offset); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating consumer_offset" + slab_pos, consumer_pos, + "partition_insert: position mismatch when creating consumer_offset" ); - let id = self.consumer_group_offset.insert(consumer_group_offset); + let cg_pos = self.consumer_group_offset.insert(consumer_group_offset); assert_eq!( - entity_id, id, - "partition_insert: id mismatch when creating consumer_group_offset" + slab_pos, cg_pos, + "partition_insert: position mismatch when creating consumer_group_offset" ); - let root = self.root.get_mut(entity_id).unwrap(); - root.update_id(entity_id); - entity_id + + let root = self.root.get_mut(slab_pos).unwrap(); + + // Use pre-set ID if available, otherwise use slab position + let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos }; + root.update_id(logical_id); + + // Update numeric index + self.numeric_index.insert(logical_id, slab_pos); + + logical_id } } @@ -120,14 +134,25 @@ impl Delete for Partitions { type Idx = ContainerId; type Item = Partition; - fn delete(&mut self, id: Self::Idx) -> Self::Item { - let root = self.root.remove(id); - let stats = self.stats.remove(id); - let message_deduplicator = self.message_deduplicator.remove(id); - let offset = self.offset.remove(id); - let consumer_offset = self.consumer_offset.remove(id); - let consumer_group_offset = self.consumer_group_offset.remove(id); - let log = self.log.remove(id); + fn delete(&mut self, logical_id: Self::Idx) -> Self::Item { + // Look up slab position from logical ID + let slab_pos = *self + .numeric_index + .get(&logical_id) + .expect("partition_delete: logical ID not found in numeric index"); + + let root = self.root.remove(slab_pos); + let stats = self.stats.remove(slab_pos); + let message_deduplicator = self.message_deduplicator.remove(slab_pos); + let offset = self.offset.remove(slab_pos); + let consumer_offset = self.consumer_offset.remove(slab_pos); + let consumer_group_offset = self.consumer_group_offset.remove(slab_pos); + let log = self.log.remove(slab_pos); + + // Remove from numeric index + self.numeric_index + .remove(&logical_id) + .expect("partition_delete: ID not found in numeric index"); Partition::new_with_components( root, @@ -197,6 +222,7 @@ impl EntityComponentSystemMut for Partitions { impl Default for Partitions { fn default() -> Self { Self { + numeric_index: ahash::AHashMap::with_capacity(PARTITIONS_CAPACITY), root: Slab::with_capacity(PARTITIONS_CAPACITY), stats: Slab::with_capacity(PARTITIONS_CAPACITY), log: Slab::with_capacity(PARTITIONS_CAPACITY), @@ -223,21 +249,29 @@ impl Partitions { pub fn with_partition_by_id<T>( &self, - id: ContainerId, + logical_id: ContainerId, f: impl FnOnce(ComponentsById<PartitionRef>) -> T, ) -> T { - self.with_components_by_id(id, |components| f(components)) + let slab_pos = *self + .numeric_index + .get(&logical_id) + .expect("Partition not found by ID"); + self.with_components_by_id(slab_pos, |components| f(components)) } pub fn exists(&self, id: ContainerId) -> bool { - self.root.contains(id) + self.numeric_index.contains_key(&id) } pub fn with_partition_by_id_mut<T>( &mut self, - id: ContainerId, + logical_id: ContainerId, f: impl FnOnce(ComponentsById<PartitionRefMut>) -> T, ) -> T { - self.with_components_by_id_mut(id, |components| f(components)) + let slab_pos = *self + .numeric_index + .get(&logical_id) + .expect("Partition not found by ID"); + self.with_components_by_id_mut(slab_pos, |components| f(components)) } } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index ad7d5368d..d82b95ebb 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -71,7 +71,11 @@ pub type ContainerId = usize; #[derive(Debug, Clone)] pub struct Streams { + /// Name index: stream name → slab position index: RefCell<AHashMap<<stream::StreamRoot as Keyed>::Key, ContainerId>>, + /// Numeric ID index: logical stream ID → slab position + /// This allows logical IDs to differ from slab positions (needed for lazy creation) + numeric_index: RefCell<AHashMap<usize, ContainerId>>, root: RefCell<Slab<stream::StreamRoot>>, stats: RefCell<Slab<Arc<StreamStats>>>, } @@ -80,6 +84,7 @@ impl Default for Streams { fn default() -> Self { Self { index: RefCell::new(AHashMap::with_capacity(CAPACITY)), + numeric_index: RefCell::new(AHashMap::with_capacity(CAPACITY)), root: RefCell::new(Slab::with_capacity(CAPACITY)), stats: RefCell::new(Slab::with_capacity(CAPACITY)), } @@ -109,20 +114,32 @@ impl InsertCell for Streams { fn insert(&self, item: Self::Item) -> Self::Idx { let (root, stats) = item.into_components(); let mut root_container = self.root.borrow_mut(); - let mut indexes = self.index.borrow_mut(); + let mut name_index = self.index.borrow_mut(); + let mut numeric_index = self.numeric_index.borrow_mut(); let mut stats_container = self.stats.borrow_mut(); + // Check if ID was pre-set (e.g., from metadata during lazy creation) + let pre_set_id = root.id(); + let key = root.key().clone(); - let entity_id = root_container.insert(root); - let id = stats_container.insert(stats); + let slab_pos = root_container.insert(root); + let stats_pos = stats_container.insert(stats); assert_eq!( - entity_id, id, - "stream_insert: id mismatch when inserting stats" + slab_pos, stats_pos, + "stream_insert: position mismatch when inserting stats" ); - let root = root_container.get_mut(entity_id).unwrap(); - root.update_id(entity_id); - indexes.insert(key, entity_id); - entity_id + + let root = root_container.get_mut(slab_pos).unwrap(); + + // Use pre-set ID if available, otherwise use slab position + let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos }; + root.update_id(logical_id); + + // Update both indexes + name_index.insert(key, slab_pos); + numeric_index.insert(logical_id, slab_pos); + + logical_id } } @@ -130,19 +147,28 @@ impl DeleteCell for Streams { type Idx = ContainerId; type Item = stream::Stream; - fn delete(&self, id: Self::Idx) -> Self::Item { + fn delete(&self, logical_id: Self::Idx) -> Self::Item { let mut root_container = self.root.borrow_mut(); - let mut indexes = self.index.borrow_mut(); + let mut name_index = self.index.borrow_mut(); + let mut numeric_index = self.numeric_index.borrow_mut(); let mut stats_container = self.stats.borrow_mut(); - let root = root_container.remove(id); - let stats = stats_container.remove(id); + // Look up slab position from logical ID + let slab_pos = *numeric_index + .get(&logical_id) + .expect("stream_delete: logical ID not found in numeric index"); - // Remove from index + let root = root_container.remove(slab_pos); + let stats = stats_container.remove(slab_pos); + + // Remove from both indexes let key = root.key(); - indexes + name_index .remove(key) - .expect("stream_delete: key not found in index"); + .expect("stream_delete: key not found in name index"); + numeric_index + .remove(&logical_id) + .expect("stream_delete: ID not found in numeric index"); stream::Stream::new_with_components(root, stats) } @@ -441,8 +467,8 @@ impl Streams { pub fn exists(&self, id: &Identifier) -> bool { match id.kind { iggy_common::IdKind::Numeric => { - let id = id.get_u32_value().unwrap() as usize; - self.root.borrow().contains(id) + let logical_id = id.get_u32_value().unwrap() as usize; + self.numeric_index.borrow().contains_key(&logical_id) } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); @@ -451,12 +477,45 @@ impl Streams { } } + /// Check if a partition exists locally (stream, topic, and partition all exist). + /// Returns false if any level doesn't exist, without panicking. + pub fn partition_exists( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + ) -> bool { + if !self.exists(stream_id) { + return false; + } + + let topic_exists = self.with_topics(stream_id, |topics| topics.exists(topic_id)); + if !topic_exists { + return false; + } + + self.with_partitions(stream_id, topic_id, |partitions| { + partitions.exists(partition_id) + }) + } + pub fn get_index(&self, id: &Identifier) -> usize { match id.kind { - iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::Numeric => { + let logical_id = id.get_u32_value().unwrap() as usize; + *self + .numeric_index + .borrow() + .get(&logical_id) + .expect("Stream not found by numeric ID") + } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); - *self.index.borrow().get(&key).expect("Stream not found") + *self + .index + .borrow() + .get(&key) + .expect("Stream not found by name") } } } diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs index 7bfed97e7..39cda6946 100644 --- a/core/server/src/slab/topics.rs +++ b/core/server/src/slab/topics.rs @@ -45,7 +45,10 @@ pub type ContainerId = usize; #[derive(Debug, Clone)] pub struct Topics { + /// Name index: topic name → slab position index: RefCell<AHashMap<<topic::TopicRoot as Keyed>::Key, ContainerId>>, + /// Numeric ID index: logical topic ID → slab position + numeric_index: RefCell<AHashMap<usize, ContainerId>>, root: RefCell<Slab<topic::TopicRoot>>, auxilaries: RefCell<Slab<topic::TopicAuxilary>>, stats: RefCell<Slab<Arc<TopicStats>>>, @@ -59,25 +62,37 @@ impl InsertCell for Topics { let (root, auxilary, stats) = item.into_components(); let mut root_container = self.root.borrow_mut(); let mut auxilaries = self.auxilaries.borrow_mut(); - let mut indexes = self.index.borrow_mut(); + let mut name_index = self.index.borrow_mut(); + let mut numeric_index = self.numeric_index.borrow_mut(); let mut stats_container = self.stats.borrow_mut(); + // Check if ID was pre-set (e.g., from metadata during lazy creation) + let pre_set_id = root.id(); + let key = root.key().clone(); - let entity_id = root_container.insert(root); - let id = stats_container.insert(stats); + let slab_pos = root_container.insert(root); + let stats_pos = stats_container.insert(stats); assert_eq!( - entity_id, id, - "topic_insert: id mismatch when inserting stats component" + slab_pos, stats_pos, + "topic_insert: position mismatch when inserting stats component" ); - let id = auxilaries.insert(auxilary); + let aux_pos = auxilaries.insert(auxilary); assert_eq!( - entity_id, id, - "topic_insert: id mismatch when inserting auxilary component" + slab_pos, aux_pos, + "topic_insert: position mismatch when inserting auxilary component" ); - let root = root_container.get_mut(entity_id).unwrap(); - root.update_id(entity_id); - indexes.insert(key, entity_id); - entity_id + + let root = root_container.get_mut(slab_pos).unwrap(); + + // Use pre-set ID if available, otherwise use slab position + let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos }; + root.update_id(logical_id); + + // Update both indexes + name_index.insert(key, slab_pos); + numeric_index.insert(logical_id, slab_pos); + + logical_id } } @@ -85,24 +100,33 @@ impl DeleteCell for Topics { type Idx = ContainerId; type Item = topic::Topic; - fn delete(&self, id: Self::Idx) -> Self::Item { + fn delete(&self, logical_id: Self::Idx) -> Self::Item { let mut root_container = self.root.borrow_mut(); let mut auxilaries = self.auxilaries.borrow_mut(); - let mut indexes = self.index.borrow_mut(); + let mut name_index = self.index.borrow_mut(); + let mut numeric_index = self.numeric_index.borrow_mut(); let mut stats_container = self.stats.borrow_mut(); - let root = root_container.remove(id); - let auxilary = auxilaries.remove(id); - let stats = stats_container.remove(id); + // Look up slab position from logical ID + let slab_pos = *numeric_index + .get(&logical_id) + .expect("topic_delete: logical ID not found in numeric index"); + + let root = root_container.remove(slab_pos); + let auxilary = auxilaries.remove(slab_pos); + let stats = stats_container.remove(slab_pos); - // Remove from index + // Remove from both indexes let key = root.key(); - indexes.remove(key).unwrap_or_else(|| { + name_index.remove(key).unwrap_or_else(|| { panic!( "topic_delete: key not found with key: {} and id: {}", - key, id + key, logical_id ) }); + numeric_index + .remove(&logical_id) + .expect("topic_delete: ID not found in numeric index"); topic::Topic::new_with_components(root, auxilary, stats) } @@ -130,6 +154,7 @@ impl Default for Topics { fn default() -> Self { Self { index: RefCell::new(AHashMap::with_capacity(CAPACITY)), + numeric_index: RefCell::new(AHashMap::with_capacity(CAPACITY)), root: RefCell::new(Slab::with_capacity(CAPACITY)), auxilaries: RefCell::new(Slab::with_capacity(CAPACITY)), stats: RefCell::new(Slab::with_capacity(CAPACITY)), @@ -173,8 +198,8 @@ impl Topics { pub fn exists(&self, id: &Identifier) -> bool { match id.kind { iggy_common::IdKind::Numeric => { - let id = id.get_u32_value().unwrap() as usize; - self.root.borrow().contains(id) + let logical_id = id.get_u32_value().unwrap() as usize; + self.numeric_index.borrow().contains_key(&logical_id) } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); @@ -185,10 +210,21 @@ impl Topics { pub fn get_index(&self, id: &Identifier) -> usize { match id.kind { - iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as usize, + iggy_common::IdKind::Numeric => { + let logical_id = id.get_u32_value().unwrap() as usize; + *self + .numeric_index + .borrow() + .get(&logical_id) + .expect("Topic not found by numeric ID") + } iggy_common::IdKind::String => { let key = id.get_string_value().unwrap(); - *self.index.borrow().get(&key).expect("Topic not found") + *self + .index + .borrow() + .get(&key) + .expect("Topic not found by name") } } } diff --git a/core/server/src/streaming/partitions/partition.rs b/core/server/src/streaming/partitions/partition.rs index 7a07b8f33..bc06416d4 100644 --- a/core/server/src/streaming/partitions/partition.rs +++ b/core/server/src/streaming/partitions/partition.rs @@ -157,7 +157,7 @@ impl Partition { } } - pub fn stats(&self) -> &PartitionStats { + pub fn stats(&self) -> &Arc<PartitionStats> { &self.stats } } diff --git a/core/server/src/streaming/topics/topic.rs b/core/server/src/streaming/topics/topic.rs index 348dc3c8a..5a3aa1ee9 100644 --- a/core/server/src/streaming/topics/topic.rs +++ b/core/server/src/streaming/topics/topic.rs @@ -121,7 +121,7 @@ impl Topic { &mut self.root } - pub fn stats(&self) -> &TopicStats { + pub fn stats(&self) -> &Arc<TopicStats> { &self.stats } } diff --git a/core/server/src/tcp/tcp_listener.rs b/core/server/src/tcp/tcp_listener.rs index c68547301..82b539f9d 100644 --- a/core/server/src/tcp/tcp_listener.rs +++ b/core/server/src/tcp/tcp_listener.rs @@ -20,7 +20,6 @@ use crate::configs::tcp::TcpSocketConfig; use crate::shard::IggyShard; use crate::shard::task_registry::{ShutdownToken, TaskRegistry}; -use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{ConnectionAction, handle_connection, handle_error}; use compio::net::{TcpListener, TcpOpts}; use err_trail::ErrContext; @@ -72,7 +71,7 @@ pub async fn start( if shard.id != 0 && addr.port() == 0 { info!("Waiting for TCP address from shard 0..."); loop { - if let Some(bound_addr) = shard.tcp_bound_address.get() { + if let Some(bound_addr) = shard.shared_metadata.load().bound_addresses.tcp { addr = bound_addr; info!("Received TCP address: {}", addr); break; @@ -94,19 +93,14 @@ pub async fn start( info!("{} server has started on: {:?}", server_name, actual_addr); if shard.id == 0 { - // Store bound address locally + // Store in Cell for config_writer backward compat shard.tcp_bound_address.set(Some(actual_addr)); + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_tcp_address(actual_addr); if addr.port() == 0 { // Notify config writer on shard 0 let _ = shard.config_writer_notify.try_send(()); - - // Broadcast to other shards for SO_REUSEPORT binding - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::Tcp, - address: actual_addr, - }; - shard.broadcast_event_to_all_shards(event).await?; } } diff --git a/core/server/src/tcp/tcp_tls_listener.rs b/core/server/src/tcp/tcp_tls_listener.rs index 50a8c1d19..ef257b05b 100644 --- a/core/server/src/tcp/tcp_tls_listener.rs +++ b/core/server/src/tcp/tcp_tls_listener.rs @@ -19,7 +19,6 @@ use crate::configs::tcp::TcpSocketConfig; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; -use crate::shard::transmission::event::ShardEvent; use crate::tcp::connection_handler::{handle_connection, handle_error}; use compio::net::{TcpListener, TcpOpts}; use compio_tls::TlsAcceptor; @@ -46,7 +45,7 @@ pub(crate) async fn start( if shard.id != 0 && addr.port() == 0 { info!("Waiting for TCP address from shard 0..."); loop { - if let Some(bound_addr) = shard.tcp_bound_address.get() { + if let Some(bound_addr) = shard.shared_metadata.load().bound_addresses.tcp { addr = bound_addr; info!("Received TCP address: {}", addr); break; @@ -68,16 +67,14 @@ pub(crate) async fn start( })?; if shard.id == 0 { + // Store in Cell for config_writer backward compat shard.tcp_bound_address.set(Some(actual_addr)); + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_tcp_address(actual_addr); + if addr.port() == 0 { // Notify config writer on shard 0 let _ = shard.config_writer_notify.try_send(()); - - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::Tcp, - address: actual_addr, - }; - shard.broadcast_event_to_all_shards(event).await?; } } diff --git a/core/server/src/websocket/websocket_listener.rs b/core/server/src/websocket/websocket_listener.rs index 7b691c61b..d17273cb6 100644 --- a/core/server/src/websocket/websocket_listener.rs +++ b/core/server/src/websocket/websocket_listener.rs @@ -19,7 +19,6 @@ use crate::configs::websocket::WebSocketConfig; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; -use crate::shard::transmission::event::ShardEvent; use crate::websocket::connection_handler::{handle_connection, handle_error}; use compio::net::TcpListener; use compio_net::TcpOpts; @@ -58,7 +57,7 @@ pub async fn start( if shard.id != 0 && addr.port() == 0 { info!("Waiting for WebSocket address from shard 0..."); loop { - if let Some(bound_addr) = shard.websocket_bound_address.get() { + if let Some(bound_addr) = shard.shared_metadata.load().bound_addresses.websocket { addr = bound_addr; info!("Received WebSocket address from shard 0: {}", addr); break; @@ -78,25 +77,16 @@ pub async fn start( let local_addr = listener.local_addr().unwrap(); info!("{} has started on: ws://{}", "WebSocket Server", local_addr); - // Notify shard about the bound address - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::WebSocket, - address: local_addr, - }; - if shard.id == 0 { - // Store bound address locally first + // Store in Cell for config_writer backward compat shard.websocket_bound_address.set(Some(local_addr)); - - if addr.port() == 0 { - // Broadcast to other shards for SO_REUSEPORT binding - shard.broadcast_event_to_all_shards(event).await?; - } + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_websocket_address(local_addr); + // Notify config_writer that WebSocket is bound + let _ = shard.config_writer_notify.try_send(()); } else { - // Non-shard0 just handles the event locally - crate::shard::handlers::handle_event(&shard, event) - .await - .ok(); + // Non-shard-0 just updates local Cell + shard.websocket_bound_address.set(Some(local_addr)); } let ws_config = config.to_tungstenite_config(); diff --git a/core/server/src/websocket/websocket_tls_listener.rs b/core/server/src/websocket/websocket_tls_listener.rs index 3743162fb..45c080da1 100644 --- a/core/server/src/websocket/websocket_tls_listener.rs +++ b/core/server/src/websocket/websocket_tls_listener.rs @@ -19,7 +19,6 @@ use crate::configs::websocket::WebSocketConfig; use crate::shard::IggyShard; use crate::shard::task_registry::ShutdownToken; -use crate::shard::transmission::event::ShardEvent; use crate::websocket::connection_handler::{handle_connection, handle_error}; use compio::net::TcpListener; use compio_net::TcpOpts; @@ -63,7 +62,7 @@ pub async fn start( if shard.id != 0 && addr.port() == 0 { info!("Waiting for WebSocket TLS address from shard 0..."); loop { - if let Some(bound_addr) = shard.websocket_bound_address.get() { + if let Some(bound_addr) = shard.shared_metadata.load().bound_addresses.websocket { addr = bound_addr; info!("Received WebSocket TLS address from shard 0: {}", addr); break; @@ -81,25 +80,16 @@ pub async fn start( let local_addr = listener.local_addr().unwrap(); - // Notify shard about the bound address - let event = ShardEvent::AddressBound { - protocol: TransportProtocol::WebSocket, - address: local_addr, - }; - if shard.id == 0 { - // Store bound address locally first + // Store in Cell for config_writer backward compat shard.websocket_bound_address.set(Some(local_addr)); - - if addr.port() == 0 { - // Broadcast to other shards for SO_REUSEPORT binding - shard.broadcast_event_to_all_shards(event).await?; - } + // Store in SharedMetadata (all shards see this immediately via ArcSwap) + shard.shared_metadata.set_websocket_address(local_addr); + // Notify config_writer that WebSocket is bound + let _ = shard.config_writer_notify.try_send(()); } else { - // Non-shard0 just handles the event locally - crate::shard::handlers::handle_event(&shard, event) - .await - .ok(); + // Non-shard-0 just updates local Cell + shard.websocket_bound_address.set(Some(local_addr)); } // Ensure rustls crypto provider is installed
