This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch arc-swap
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c7fe0e2c87cf20bbcdff09599f4da202872f0f0f
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Dec 29 16:36:01 2025 +0100

    XD
---
 core/configs/server.toml                           |   2 +-
 core/server/src/http/http_server.rs                |  34 +--
 core/server/src/main.rs                            | 136 ++++++++-
 core/server/src/metadata/mod.rs                    |   2 +
 core/server/src/metadata/shared.rs                 |  62 +++-
 core/server/src/metadata/stats_store.rs            | 108 +++++++
 core/server/src/quic/quic_server.rs                |  14 +-
 core/server/src/shard/builder.rs                   |  10 +-
 core/server/src/shard/handlers.rs                  |  50 ++--
 core/server/src/shard/lazy_init.rs                 | 317 +++++++++++++++++++++
 core/server/src/shard/mod.rs                       |   6 +-
 core/server/src/shard/system/partitions.rs         |   9 +
 core/server/src/shard/system/streams.rs            |   8 +-
 core/server/src/shard/system/topics.rs             |  14 +-
 .../src/shard/tasks/periodic/message_saver.rs      |  17 ++
 core/server/src/shard/transmission/event.rs        |   7 +-
 core/server/src/slab/partitions.rs                 | 104 ++++---
 core/server/src/slab/streams.rs                    |  99 +++++--
 core/server/src/slab/topics.rs                     |  84 ++++--
 core/server/src/streaming/partitions/partition.rs  |   2 +-
 core/server/src/streaming/topics/topic.rs          |   2 +-
 core/server/src/tcp/tcp_listener.rs                |  14 +-
 core/server/src/tcp/tcp_tls_listener.rs            |  13 +-
 core/server/src/websocket/websocket_listener.rs    |  26 +-
 .../server/src/websocket/websocket_tls_listener.rs |  26 +-
 25 files changed, 952 insertions(+), 214 deletions(-)

diff --git a/core/configs/server.toml b/core/configs/server.toml
index cec71ddc7..c2a521b87 100644
--- a/core/configs/server.toml
+++ b/core/configs/server.toml
@@ -152,7 +152,7 @@ enabled = true
 address = "127.0.0.1:8090"
 
 # Enable TCP socket migration across shards.
-socket_migration = true
+socket_migration = false
 
 # Whether to use ipv4 or ipv6
 ipv6 = false
diff --git a/core/server/src/http/http_server.rs 
b/core/server/src/http/http_server.rs
index 23eefbdab..6abc5fbc7 100644
--- a/core/server/src/http/http_server.rs
+++ b/core/server/src/http/http_server.rs
@@ -28,7 +28,6 @@ use crate::http::*;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
 use crate::shard::tasks::periodic::spawn_jwt_token_cleaner;
-use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::persistence::persister::PersisterKind;
 use axum::extract::DefaultBodyLimit;
 use axum::extract::connect_info::Connected;
@@ -38,7 +37,6 @@ use axum_server::tls_rustls::RustlsConfig;
 use compio_net::TcpListener;
 use err_trail::ErrContext;
 use iggy_common::IggyError;
-use iggy_common::TransportProtocol;
 use std::net::SocketAddr;
 use std::path::PathBuf;
 use std::rc::Rc;
@@ -142,15 +140,12 @@ pub async fn start_http_server(
             .expect("Failed to get local address for HTTP server");
         info!("Started {api_name} on: {address}");
 
-        // Notify shard about the bound address
-        let event = ShardEvent::AddressBound {
-            protocol: TransportProtocol::Http,
-            address,
-        };
-
-        crate::shard::handlers::handle_event(&shard, event)
-            .await
-            .ok();
+        // Store in Cell for config_writer backward compat
+        shard.http_bound_address.set(Some(address));
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_http_address(address);
+        // Notify config_writer that HTTP is bound
+        let _ = shard.config_writer_notify.try_send(());
 
         let service = 
app.into_make_service_with_connect_info::<CompioSocketAddr>();
 
@@ -187,17 +182,12 @@ pub async fn start_http_server(
 
         info!("Started {api_name} on: {address}");
 
-        // Notify shard about the bound address
-        use crate::shard::transmission::event::ShardEvent;
-        use iggy_common::TransportProtocol;
-        let event = ShardEvent::AddressBound {
-            protocol: TransportProtocol::Http,
-            address,
-        };
-
-        crate::shard::handlers::handle_event(&shard, event)
-            .await
-            .ok();
+        // Store in Cell for config_writer backward compat
+        shard.http_bound_address.set(Some(address));
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_http_address(address);
+        // Notify config_writer that HTTP is bound
+        let _ = shard.config_writer_notify.try_send(());
 
         let service = app.into_make_service_with_connect_info::<SocketAddr>();
         let handle = axum_server::Handle::new();
diff --git a/core/server/src/main.rs b/core/server/src/main.rs
index 51b7f0566..9e63dbc12 100644
--- a/core/server/src/main.rs
+++ b/core/server/src/main.rs
@@ -34,15 +34,21 @@ use server::configs::sharding::ShardAllocator;
 use server::diagnostics::{print_io_uring_permission_info, 
print_locked_memory_limit_info};
 use server::io::fs_utils;
 use server::log::logger::Logging;
-use server::metadata::SharedMetadata;
+use server::metadata::{
+    ConsumerGroupMeta, MetadataSnapshot, PartitionMeta, SharedMetadata, 
SharedStatsStore,
+    StreamMeta, TopicMeta, UserMeta,
+};
 use server::server_error::ServerError;
 use server::shard::namespace::IggyNamespace;
 use server::shard::system::info::SystemInfo;
 use server::shard::transmission::id::ShardId;
 use server::shard::{IggyShard, calculate_shard_assignment};
+use server::slab::Keyed;
+use server::slab::streams::Streams;
 use server::slab::traits_ext::{
     EntityComponentSystem, EntityComponentSystemMutCell, IntoComponents,
 };
+use server::slab::users::Users;
 use server::state::file::FileState;
 use server::state::system::SystemState;
 use server::streaming::clients::client_manager::{Client, ClientManager};
@@ -327,6 +333,14 @@ fn main() -> Result<(), ServerError> {
         let shared_metadata = Box::leak(shared_metadata);
         let shared_metadata: EternalPtr<SharedMetadata> = 
shared_metadata.into();
 
+        // Create shared stats store for cross-shard stats visibility
+        let shared_stats = Box::new(SharedStatsStore::new());
+        let shared_stats = Box::leak(shared_stats);
+        let shared_stats: EternalPtr<SharedStatsStore> = shared_stats.into();
+
+        // Populate SharedMetadata from loaded state (also registers stats in 
SharedStatsStore)
+        populate_shared_metadata(&shared_metadata, &shared_stats, &streams, 
&users);
+
         streams.with_components(|components| {
             let (root, ..) = components.into_components();
             for (_, stream) in root.iter() {
@@ -360,6 +374,7 @@ fn main() -> Result<(), ServerError> {
             let streams = streams.clone();
             let shards_table = shards_table.clone();
             let shared_metadata = shared_metadata.clone();
+            let shared_stats = shared_stats.clone();
             let users = users.clone();
             let connections = connections.clone();
             let config = config.clone();
@@ -425,6 +440,7 @@ fn main() -> Result<(), ServerError> {
                                 .users(users)
                                 .shards_table(shards_table)
                                 .shared_metadata(shared_metadata)
+                                .shared_stats(shared_stats)
                                 .connections(connections)
                                 .clients_manager(client_manager)
                                 .config(config)
@@ -552,3 +568,121 @@ fn main() -> Result<(), ServerError> {
         Ok(())
     })
 }
+
+/// Populate SharedMetadata with existing streams and users loaded from state.
+/// Also registers stats in SharedStatsStore for cross-shard visibility.
+fn populate_shared_metadata(
+    shared_metadata: &SharedMetadata,
+    shared_stats: &SharedStatsStore,
+    streams: &Streams,
+    users: &Users,
+) {
+    let mut snapshot = MetadataSnapshot::new();
+    let mut max_stream_id: usize = 0;
+    let mut max_user_id: u32 = 0;
+
+    // Populate streams, topics, partitions, consumer groups
+    streams.with_components(|components| {
+        let (root, stats) = components.into_components();
+        for (idx, stream) in root.iter() {
+            max_stream_id = max_stream_id.max(stream.id());
+
+            // Register stream stats in SharedStatsStore
+            let stream_stats = Arc::clone(&stats[idx]);
+            shared_stats.register_stream_stats(stream.id(), stream_stats);
+
+            let mut stream_meta =
+                StreamMeta::new(stream.id(), stream.name().to_string(), 
stream.created_at());
+
+            stream.topics().with_components(|topic_components| {
+                let (topic_roots, _topic_aux, topic_stats) = 
topic_components.into_components();
+                for (topic_idx, topic) in topic_roots.iter() {
+                    // Register topic stats in SharedStatsStore
+                    let topic_stats_arc = Arc::clone(&topic_stats[topic_idx]);
+                    shared_stats.register_topic_stats(stream.id(), topic.id(), 
topic_stats_arc);
+
+                    let mut topic_meta = TopicMeta::new(
+                        topic.id(),
+                        topic.name().to_string(),
+                        topic.created_at(),
+                        topic.replication_factor(),
+                        topic.message_expiry(),
+                        topic.compression_algorithm(),
+                        topic.max_topic_size(),
+                    );
+
+                    // Add partitions
+                    topic.partitions().with_components(|part_components| {
+                        let (part_roots, part_stats, ..) = 
part_components.into_components();
+                        for (part_idx, partition) in part_roots.iter() {
+                            // Register partition stats in SharedStatsStore
+                            let part_stats_arc = 
Arc::clone(&part_stats[part_idx]);
+                            shared_stats.register_partition_stats(
+                                stream.id(),
+                                topic.id(),
+                                partition.id(),
+                                part_stats_arc,
+                            );
+
+                            topic_meta.add_partition(PartitionMeta::new(
+                                partition.id(),
+                                partition.created_at(),
+                            ));
+                        }
+                    });
+
+                    // Add consumer groups
+                    topic.consumer_groups().with_components(|cg_components| {
+                        let (cg_roots, ..) = cg_components.into_components();
+                        for (_, cg) in cg_roots.iter() {
+                            let partition_ids = cg.partitions().clone();
+                            
topic_meta.add_consumer_group(ConsumerGroupMeta::new(
+                                cg.id(),
+                                cg.key().clone(),
+                                partition_ids,
+                            ));
+                        }
+                    });
+
+                    stream_meta.add_topic(topic_meta);
+                }
+            });
+
+            let name = stream_meta.name.clone();
+            snapshot.streams.insert(stream.id(), stream_meta);
+            snapshot.stream_index.insert(name, stream.id());
+        }
+    });
+
+    // Populate users
+    for user in users.values() {
+        max_user_id = max_user_id.max(user.id);
+
+        let user_meta = UserMeta::new(
+            user.id,
+            user.username.clone(),
+            user.password.clone(),
+            user.created_at,
+            user.status,
+            user.permissions.clone(),
+        );
+
+        let username = user_meta.username.clone();
+        snapshot.users.insert(user.id, user_meta);
+        snapshot.user_index.insert(username, user.id);
+    }
+
+    // Capture counts before consuming snapshot
+    let streams_count = snapshot.streams.len();
+    let users_count = snapshot.users.len();
+    let next_stream = max_stream_id.saturating_add(1);
+    let next_user = max_user_id.saturating_add(1);
+
+    // Initialize with next IDs = max + 1
+    shared_metadata.init_from_snapshot(snapshot, next_stream, next_user);
+
+    info!(
+        "Populated SharedMetadata: {} streams, {} users (next_stream_id={}, 
next_user_id={})",
+        streams_count, users_count, next_stream, next_user,
+    );
+}
diff --git a/core/server/src/metadata/mod.rs b/core/server/src/metadata/mod.rs
index e964dccd9..7564c1e9c 100644
--- a/core/server/src/metadata/mod.rs
+++ b/core/server/src/metadata/mod.rs
@@ -25,6 +25,7 @@ mod consumer_group;
 mod partition;
 mod shared;
 mod snapshot;
+mod stats_store;
 mod stream;
 mod topic;
 mod user;
@@ -33,6 +34,7 @@ pub use consumer_group::ConsumerGroupMeta;
 pub use partition::PartitionMeta;
 pub use shared::SharedMetadata;
 pub use snapshot::MetadataSnapshot;
+pub use stats_store::SharedStatsStore;
 pub use stream::StreamMeta;
 pub use topic::TopicMeta;
 pub use user::{PersonalAccessTokenMeta, UserMeta};
diff --git a/core/server/src/metadata/shared.rs 
b/core/server/src/metadata/shared.rs
index f03086b65..9e108c94c 100644
--- a/core/server/src/metadata/shared.rs
+++ b/core/server/src/metadata/shared.rs
@@ -131,16 +131,21 @@ impl SharedMetadata {
 
     // Stream operations
 
-    /// Create a new stream. Returns the created stream metadata.
-    pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> 
{
+    /// Add a stream with a specific ID (used by dual-write from shard 0).
+    /// The ID should come from the actual slab allocation.
+    pub fn add_stream(
+        &self,
+        id: usize,
+        name: String,
+        created_at: IggyTimestamp,
+    ) -> Result<StreamMeta, IggyError> {
         let current = self.inner.load();
 
         if current.stream_exists_by_name(&name) {
             return Err(IggyError::StreamNameAlreadyExists(name));
         }
 
-        let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst);
-        let stream = StreamMeta::new(id, name.clone(), IggyTimestamp::now());
+        let stream = StreamMeta::new(id, name.clone(), created_at);
 
         let mut new_snapshot = (**current).clone();
         new_snapshot.stream_index.insert(name, id);
@@ -151,6 +156,13 @@ impl SharedMetadata {
         Ok(stream)
     }
 
+    /// Create a new stream with auto-generated ID.
+    /// Primarily for tests - production code should use `add_stream` with the 
actual slab ID.
+    pub fn create_stream(&self, name: String) -> Result<StreamMeta, IggyError> 
{
+        let id = self.next_stream_id.fetch_add(1, Ordering::SeqCst);
+        self.add_stream(id, name, IggyTimestamp::now())
+    }
+
     /// Delete a stream. Returns the deleted stream metadata.
     pub fn delete_stream(&self, id: &Identifier) -> Result<StreamMeta, 
IggyError> {
         let current = self.inner.load();
@@ -207,12 +219,15 @@ impl SharedMetadata {
 
     // Topic operations
 
-    /// Create a new topic in a stream.
+    /// Add a topic with a specific ID (used by dual-write from shard 0).
+    /// The ID should come from the actual slab allocation.
     #[allow(clippy::too_many_arguments)]
-    pub fn create_topic(
+    pub fn add_topic(
         &self,
         stream_id: &Identifier,
+        topic_id: usize,
         name: String,
+        created_at: IggyTimestamp,
         replication_factor: u8,
         message_expiry: IggyExpiry,
         compression_algorithm: CompressionAlgorithm,
@@ -229,12 +244,10 @@ impl SharedMetadata {
             return Err(IggyError::TopicNameAlreadyExists(name, 
stream_id.clone()));
         }
 
-        // Topic IDs are sequential within a stream
-        let topic_id = stream.topics.len();
         let topic = TopicMeta::new(
             topic_id,
             name.clone(),
-            IggyTimestamp::now(),
+            created_at,
             replication_factor,
             message_expiry,
             compression_algorithm,
@@ -253,6 +266,37 @@ impl SharedMetadata {
         Ok(topic)
     }
 
+    /// Create a new topic with auto-generated ID.
+    /// Primarily for tests - production code should use `add_topic` with the 
actual slab ID.
+    #[allow(clippy::too_many_arguments)]
+    pub fn create_topic(
+        &self,
+        stream_id: &Identifier,
+        name: String,
+        replication_factor: u8,
+        message_expiry: IggyExpiry,
+        compression_algorithm: CompressionAlgorithm,
+        max_topic_size: MaxTopicSize,
+    ) -> Result<TopicMeta, IggyError> {
+        let current = self.inner.load();
+        let stream_idx = current
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+        let stream = current.streams.get(&stream_idx).unwrap();
+        let topic_id = stream.topics.len();
+
+        self.add_topic(
+            stream_id,
+            topic_id,
+            name,
+            IggyTimestamp::now(),
+            replication_factor,
+            message_expiry,
+            compression_algorithm,
+            max_topic_size,
+        )
+    }
+
     /// Delete a topic from a stream.
     pub fn delete_topic(
         &self,
diff --git a/core/server/src/metadata/stats_store.rs 
b/core/server/src/metadata/stats_store.rs
new file mode 100644
index 000000000..242e80bca
--- /dev/null
+++ b/core/server/src/metadata/stats_store.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Shared statistics store for cross-shard stats visibility.
+//!
+//! When shard 0 creates a stream/topic/partition, it registers the stats Arc 
here.
+//! When other shards do lazy init, they retrieve the same Arc so updates are 
visible.
+
+use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+use dashmap::DashMap;
+use std::sync::Arc;
+
+/// Thread-safe store for sharing stats Arcs across shards.
+/// Uses DashMap for lock-free concurrent access.
+#[derive(Debug, Default)]
+pub struct SharedStatsStore {
+    /// Stream stats indexed by stream_id
+    stream_stats: DashMap<usize, Arc<StreamStats>>,
+
+    /// Topic stats indexed by (stream_id, topic_id)
+    topic_stats: DashMap<(usize, usize), Arc<TopicStats>>,
+
+    /// Partition stats indexed by (stream_id, topic_id, partition_id)
+    partition_stats: DashMap<(usize, usize, usize), Arc<PartitionStats>>,
+}
+
+impl SharedStatsStore {
+    pub fn new() -> Self {
+        Self {
+            stream_stats: DashMap::new(),
+            topic_stats: DashMap::new(),
+            partition_stats: DashMap::new(),
+        }
+    }
+
+    // Stream stats
+
+    pub fn register_stream_stats(&self, stream_id: usize, stats: 
Arc<StreamStats>) {
+        self.stream_stats.insert(stream_id, stats);
+    }
+
+    pub fn get_stream_stats(&self, stream_id: usize) -> 
Option<Arc<StreamStats>> {
+        self.stream_stats.get(&stream_id).map(|r| Arc::clone(&r))
+    }
+
+    pub fn remove_stream_stats(&self, stream_id: usize) {
+        self.stream_stats.remove(&stream_id);
+    }
+
+    // Topic stats
+
+    pub fn register_topic_stats(&self, stream_id: usize, topic_id: usize, 
stats: Arc<TopicStats>) {
+        self.topic_stats.insert((stream_id, topic_id), stats);
+    }
+
+    pub fn get_topic_stats(&self, stream_id: usize, topic_id: usize) -> 
Option<Arc<TopicStats>> {
+        self.topic_stats
+            .get(&(stream_id, topic_id))
+            .map(|r| Arc::clone(&r))
+    }
+
+    pub fn remove_topic_stats(&self, stream_id: usize, topic_id: usize) {
+        self.topic_stats.remove(&(stream_id, topic_id));
+    }
+
+    // Partition stats
+
+    pub fn register_partition_stats(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+        stats: Arc<PartitionStats>,
+    ) {
+        self.partition_stats
+            .insert((stream_id, topic_id, partition_id), stats);
+    }
+
+    pub fn get_partition_stats(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_id: usize,
+    ) -> Option<Arc<PartitionStats>> {
+        self.partition_stats
+            .get(&(stream_id, topic_id, partition_id))
+            .map(|r| Arc::clone(&r))
+    }
+
+    pub fn remove_partition_stats(&self, stream_id: usize, topic_id: usize, 
partition_id: usize) {
+        self.partition_stats
+            .remove(&(stream_id, topic_id, partition_id));
+    }
+}
diff --git a/core/server/src/quic/quic_server.rs 
b/core/server/src/quic/quic_server.rs
index d70a30877..e18b5a1c9 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -21,7 +21,6 @@ use crate::quic::{COMPONENT, listener, quic_socket};
 use crate::server_error::QuicError;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
-use crate::shard::transmission::event::ShardEvent;
 use anyhow::Result;
 use compio_quic::{
     Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, 
TransportConfig, VarInt,
@@ -66,7 +65,7 @@ pub async fn spawn_quic_server(
     if shard.id != 0 && addr.port() == 0 {
         info!("Waiting for QUIC address from shard 0...");
         loop {
-            if let Some(bound_addr) = shard.quic_bound_address.get() {
+            if let Some(bound_addr) = 
shard.shared_metadata.load().bound_addresses.quic {
                 addr = bound_addr;
                 info!("Received QUIC address: {}", addr);
                 break;
@@ -117,19 +116,14 @@ pub async fn spawn_quic_server(
     info!("Iggy QUIC server has started on: {:?}", actual_addr);
 
     if shard.id == 0 {
-        // Store bound address locally
+        // Store in Cell for config_writer backward compat
         shard.quic_bound_address.set(Some(actual_addr));
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_quic_address(actual_addr);
 
         if addr.port() == 0 {
             // Notify config writer on shard 0
             let _ = shard.config_writer_notify.try_send(());
-
-            // Broadcast to other shards for SO_REUSEPORT binding
-            let event = ShardEvent::AddressBound {
-                protocol: iggy_common::TransportProtocol::Quic,
-                address: actual_addr,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
         }
     } else {
         shard.quic_bound_address.set(Some(actual_addr));
diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs
index e394b9e0c..0996a88b9 100644
--- a/core/server/src/shard/builder.rs
+++ b/core/server/src/shard/builder.rs
@@ -22,7 +22,7 @@ use super::{
 };
 use crate::{
     configs::server::ServerConfig,
-    metadata::SharedMetadata,
+    metadata::{SharedMetadata, SharedStatsStore},
     shard::namespace::IggyNamespace,
     slab::{streams::Streams, users::Users},
     state::file::FileState,
@@ -42,6 +42,7 @@ pub struct IggyShardBuilder {
     streams: Option<Streams>,
     shards_table: Option<EternalPtr<DashMap<IggyNamespace, ShardId>>>,
     shared_metadata: Option<EternalPtr<SharedMetadata>>,
+    shared_stats: Option<EternalPtr<SharedStatsStore>>,
     state: Option<FileState>,
     users: Option<Users>,
     client_manager: Option<ClientManager>,
@@ -82,6 +83,11 @@ impl IggyShardBuilder {
         self
     }
 
+    pub fn shared_stats(mut self, shared_stats: EternalPtr<SharedStatsStore>) 
-> Self {
+        self.shared_stats = Some(shared_stats);
+        self
+    }
+
     pub fn clients_manager(mut self, client_manager: ClientManager) -> Self {
         self.client_manager = Some(client_manager);
         self
@@ -128,6 +134,7 @@ impl IggyShardBuilder {
         let streams = self.streams.unwrap();
         let shards_table = self.shards_table.unwrap();
         let shared_metadata = self.shared_metadata.unwrap();
+        let shared_stats = self.shared_stats.unwrap();
         let state = self.state.unwrap();
         let users = self.users.unwrap();
         let config = self.config.unwrap();
@@ -163,6 +170,7 @@ impl IggyShardBuilder {
             shards,
             shards_table,
             shared_metadata,
+            shared_stats,
             streams, // TODO: Fixme
             users,
             fs_locks: Default::default(),
diff --git a/core/server/src/shard/handlers.rs 
b/core/server/src/shard/handlers.rs
index c1aed5c92..23c0ea52a 100644
--- a/core/server/src/shard/handlers.rs
+++ b/core/server/src/shard/handlers.rs
@@ -63,6 +63,11 @@ async fn handle_request(
     let partition_id = request.partition_id;
     match request.payload {
         ShardRequestPayload::SendMessages { batch } => {
+            // Lazy init: ensure partition exists locally
+            shard
+                .ensure_local_partition(&stream_id, &topic_id, partition_id)
+                .await?;
+
             let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id);
             let batch = shard.maybe_encrypt_messages(batch)?;
             let messages_count = batch.count();
@@ -74,6 +79,11 @@ async fn handle_request(
             Ok(ShardResponse::SendMessages)
         }
         ShardRequestPayload::PollMessages { args, consumer } => {
+            // Lazy init: ensure partition exists locally
+            shard
+                .ensure_local_partition(&stream_id, &topic_id, partition_id)
+                .await?;
+
             let auto_commit = args.auto_commit;
             let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id);
             let (metadata, batches) = shard.streams.poll_messages(&ns, 
consumer, args).await?;
@@ -97,12 +107,22 @@ async fn handle_request(
             Ok(ShardResponse::PollMessages((metadata, batches)))
         }
         ShardRequestPayload::FlushUnsavedBuffer { fsync } => {
+            // Lazy init: ensure partition exists locally
+            shard
+                .ensure_local_partition(&stream_id, &topic_id, partition_id)
+                .await?;
+
             shard
                 .flush_unsaved_buffer_base(&stream_id, &topic_id, 
partition_id, fsync)
                 .await?;
             Ok(ShardResponse::FlushUnsavedBuffer)
         }
         ShardRequestPayload::DeleteSegments { segments_count } => {
+            // Lazy init: ensure partition exists locally
+            shard
+                .ensure_local_partition(&stream_id, &topic_id, partition_id)
+                .await?;
+
             shard
                 .delete_segments_base(&stream_id, &topic_id, partition_id, 
segments_count)
                 .await?;
@@ -296,6 +316,11 @@ async fn handle_request(
             let registry = shard.task_registry.clone();
             let registry_clone = registry.clone();
 
+            // Lazy init: ensure partition exists locally
+            shard
+                .ensure_local_partition(&stream_id, &topic_id, partition_id)
+                .await?;
+
             let ns = IggyFullNamespace::new(stream_id, topic_id, partition_id);
             let batch = shard.maybe_encrypt_messages(initial_data)?;
             let messages_count = batch.count();
@@ -358,30 +383,5 @@ pub async fn handle_event(shard: &Rc<IggyShard>, event: 
ShardEvent) -> Result<()
                 .await?;
             Ok(())
         }
-        ShardEvent::AddressBound { protocol, address } => {
-            info!(
-                "Received AddressBound event for {:?} with address: {}",
-                protocol, address
-            );
-            match protocol {
-                TransportProtocol::Tcp => {
-                    shard.tcp_bound_address.set(Some(address));
-                    let _ = shard.config_writer_notify.try_send(());
-                }
-                TransportProtocol::Quic => {
-                    shard.quic_bound_address.set(Some(address));
-                    let _ = shard.config_writer_notify.try_send(());
-                }
-                TransportProtocol::Http => {
-                    shard.http_bound_address.set(Some(address));
-                    let _ = shard.config_writer_notify.try_send(());
-                }
-                TransportProtocol::WebSocket => {
-                    shard.websocket_bound_address.set(Some(address));
-                    let _ = shard.config_writer_notify.try_send(());
-                }
-            }
-            Ok(())
-        }
     }
 }
diff --git a/core/server/src/shard/lazy_init.rs 
b/core/server/src/shard/lazy_init.rs
new file mode 100644
index 000000000..06f63b796
--- /dev/null
+++ b/core/server/src/shard/lazy_init.rs
@@ -0,0 +1,317 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Lazy initialization of Stream/Topic/Partition structures.
+//!
+//! When a non-shard-0 receives a data plane request (SendMessages, 
PollMessages),
+//! it may not have the Stream/Topic/Partition in its local slab. This module
+//! provides lazy creation from SharedMetadata.
+
+use crate::metadata::{PartitionMeta, StreamMeta, TopicMeta};
+use crate::shard::IggyShard;
+use crate::slab::traits_ext::{EntityMarker, Insert, InsertCell};
+use crate::streaming::partitions::helpers::create_message_deduplicator;
+use crate::streaming::partitions::log::SegmentedLog;
+use crate::streaming::partitions::partition::{ConsumerGroupOffsets, 
ConsumerOffsets, Partition};
+use crate::streaming::partitions::storage::create_partition_file_hierarchy;
+use crate::streaming::stats::{StreamStats, TopicStats};
+use crate::streaming::streams::stream::Stream;
+use crate::streaming::topics::topic::Topic;
+use iggy_common::{Identifier, IggyError};
+use std::sync::Arc;
+use std::sync::atomic::AtomicU64;
+use tracing::{debug, info};
+
+impl IggyShard {
+    /// Ensures the stream/topic/partition exists locally for data operations.
+    /// Creates them from SharedMetadata if they don't exist.
+    ///
+    /// This is the core of lazy initialization - non-shard-0 shards create
+    /// data structures on-demand when they receive data plane requests.
+    ///
+    /// Accepts `Identifier` references (can be numeric or string) and resolves
+    /// them to numeric IDs using SharedMetadata.
+    pub async fn ensure_local_partition(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+    ) -> Result<(), IggyError> {
+        // Load metadata to resolve identifiers
+        let metadata = self.shared_metadata.load();
+
+        // Resolve stream identifier to numeric ID
+        let numeric_stream_id = metadata
+            .get_stream_id(stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        // Get stream metadata and resolve topic identifier
+        let stream_meta = metadata
+            .streams
+            .get(&numeric_stream_id)
+            .ok_or_else(|| IggyError::StreamIdNotFound(stream_id.clone()))?;
+
+        let numeric_topic_id = Self::resolve_topic_id(topic_id, stream_meta)
+            .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), 
stream_id.clone()))?;
+
+        // Create numeric identifiers for slab lookups
+        let stream_ident = Identifier::numeric(numeric_stream_id as u32)?;
+        let topic_ident = Identifier::numeric(numeric_topic_id as u32)?;
+
+        // Fast path: check if partition already exists
+        if self.streams.exists(&stream_ident) {
+            let topic_exists = self
+                .streams
+                .with_topics(&stream_ident, |topics| 
topics.exists(&topic_ident));
+
+            if topic_exists {
+                let partition_exists =
+                    self.streams
+                        .with_partitions(&stream_ident, &topic_ident, 
|partitions| {
+                            partitions.exists(partition_id)
+                        });
+
+                if partition_exists {
+                    return Ok(());
+                }
+            }
+        }
+
+        // Slow path: need to create from metadata
+        debug!(
+            "Lazy init: stream={}, topic={}, partition={} on shard {}",
+            numeric_stream_id, numeric_topic_id, partition_id, self.id
+        );
+
+        let topic_meta = stream_meta
+            .topics
+            .get(&numeric_topic_id)
+            .ok_or_else(|| IggyError::TopicIdNotFound(topic_id.clone(), 
stream_id.clone()))?;
+
+        let partition_meta =
+            topic_meta
+                .partitions
+                .get(&partition_id)
+                .ok_or(IggyError::PartitionNotFound(
+                    partition_id,
+                    topic_ident.clone(),
+                    stream_ident.clone(),
+                ))?;
+
+        // Create stream if needed (and get its stats)
+        let stream_stats = if !self.streams.exists(&stream_ident) {
+            self.create_stream_from_meta(stream_meta)?
+        } else {
+            // Get existing stream's stats
+            self.streams
+                .with_stream_by_id(&stream_ident, |(_root, stats)| 
Arc::clone(&stats))
+        };
+
+        // Create topic if needed (and get its stats)
+        let topic_exists = self
+            .streams
+            .with_topics(&stream_ident, |topics| topics.exists(&topic_ident));
+
+        let topic_stats = if !topic_exists {
+            self.create_topic_from_meta(numeric_stream_id, topic_meta, 
stream_stats)?
+        } else {
+            // Get existing topic's stats
+            self.streams
+                .with_topic_by_id(&stream_ident, &topic_ident, |(_root, _, 
stats)| {
+                    Arc::clone(&stats)
+                })
+        };
+
+        // Create partition if needed
+        let partition_exists =
+            self.streams
+                .with_partitions(&stream_ident, &topic_ident, |partitions| {
+                    partitions.exists(partition_id)
+                });
+
+        if !partition_exists {
+            self.create_partition_from_meta(
+                numeric_stream_id,
+                numeric_topic_id,
+                partition_meta,
+                topic_stats,
+            )
+            .await?;
+        }
+
+        Ok(())
+    }
+
+    /// Resolves a topic identifier to its numeric ID.
+    fn resolve_topic_id(topic_id: &Identifier, stream_meta: &StreamMeta) -> 
Option<usize> {
+        match topic_id.kind {
+            iggy_common::IdKind::Numeric => {
+                let id = topic_id.get_u32_value().unwrap() as usize;
+                if stream_meta.topics.contains_key(&id) {
+                    Some(id)
+                } else {
+                    None
+                }
+            }
+            iggy_common::IdKind::String => {
+                let name = topic_id.get_string_value().unwrap();
+                stream_meta.get_topic_id_by_name(&name)
+            }
+        }
+    }
+
+    /// Creates a Stream locally from metadata. Returns the stream stats.
+    /// Uses SharedStatsStore to get the same Arc<StreamStats> as shard 0.
+    fn create_stream_from_meta(&self, meta: &StreamMeta) -> 
Result<Arc<StreamStats>, IggyError> {
+        info!(
+            "Lazy creating stream: id={}, name={} on shard {}",
+            meta.id, meta.name, self.id
+        );
+
+        // Get the shared stats from SharedStatsStore (registered by shard 0)
+        let stats = self.shared_stats.get_stream_stats(meta.id).ok_or_else(|| {
+            IggyError::StreamIdNotFound(Identifier::numeric(meta.id as 
u32).unwrap())
+        })?;
+
+        let mut stream = Stream::new(meta.name.clone(), Arc::clone(&stats), 
meta.created_at);
+        stream.update_id(meta.id);
+
+        // Insert into local streams slab
+        // Note: slab position may differ from logical ID - that's fine,
+        // lookups use the name index which maps name -> slab position
+        let _ = self.streams.insert(stream);
+
+        Ok(stats)
+    }
+
+    /// Creates a Topic locally from metadata. Returns the topic stats.
+    /// Uses SharedStatsStore to get the same Arc<TopicStats> as shard 0.
+    fn create_topic_from_meta(
+        &self,
+        stream_id: usize,
+        meta: &TopicMeta,
+        _stream_stats: Arc<StreamStats>,
+    ) -> Result<Arc<TopicStats>, IggyError> {
+        info!(
+            "Lazy creating topic: stream={}, id={}, name={} on shard {}",
+            stream_id, meta.id, meta.name, self.id
+        );
+
+        // Get the shared stats from SharedStatsStore (registered by shard 0)
+        let stats = self
+            .shared_stats
+            .get_topic_stats(stream_id, meta.id)
+            .ok_or_else(|| {
+                IggyError::TopicIdNotFound(
+                    Identifier::numeric(meta.id as u32).unwrap(),
+                    Identifier::numeric(stream_id as u32).unwrap(),
+                )
+            })?;
+
+        let mut topic = Topic::new(
+            meta.name.clone(),
+            Arc::clone(&stats),
+            meta.created_at,
+            meta.replication_factor,
+            meta.message_expiry,
+            meta.compression_algorithm,
+            meta.max_topic_size,
+        );
+        topic.update_id(meta.id);
+
+        // Insert into stream's topics
+        // Note: slab position may differ from logical ID
+        let stream_ident = Identifier::numeric(stream_id as u32)?;
+        self.streams.with_topics(&stream_ident, |topics| {
+            let _ = topics.insert(topic);
+        });
+
+        Ok(stats)
+    }
+
+    /// Creates a Partition locally from metadata with SegmentedLog.
+    /// Uses SharedStatsStore to get the same Arc<PartitionStats> as shard 0.
+    async fn create_partition_from_meta(
+        &self,
+        stream_id: usize,
+        topic_id: usize,
+        partition_meta: &PartitionMeta,
+        _topic_stats: Arc<TopicStats>,
+    ) -> Result<(), IggyError> {
+        let partition_id = partition_meta.id;
+
+        info!(
+            "Lazy creating partition: stream={}, topic={}, partition={} on 
shard {}",
+            stream_id, topic_id, partition_id, self.id
+        );
+
+        // Create partition directory if needed
+        create_partition_file_hierarchy(stream_id, topic_id, partition_id, 
&self.config.system)
+            .await?;
+
+        // Get the shared stats from SharedStatsStore (registered by shard 0)
+        let stats = self
+            .shared_stats
+            .get_partition_stats(stream_id, topic_id, partition_id)
+            .ok_or_else(|| {
+                IggyError::PartitionNotFound(
+                    partition_id,
+                    Identifier::numeric(topic_id as u32).unwrap(),
+                    Identifier::numeric(stream_id as u32).unwrap(),
+                )
+            })?;
+
+        // Create message deduplicator if configured
+        let message_deduplicator = 
create_message_deduplicator(&self.config.system);
+
+        // Create empty SegmentedLog
+        let log = SegmentedLog::default();
+
+        // Create partition
+        let partition = Partition::new(
+            partition_meta.created_at,
+            true, // should_increment_offset
+            stats,
+            message_deduplicator,
+            Arc::new(AtomicU64::new(0)),
+            Arc::new(ConsumerOffsets::with_capacity(10)),
+            Arc::new(ConsumerGroupOffsets::with_capacity(10)),
+            log,
+        );
+
+        // Insert partition into topic
+        let stream_ident = Identifier::numeric(stream_id as u32)?;
+        let topic_ident = Identifier::numeric(topic_id as u32)?;
+
+        // Note: slab position may differ from logical ID
+        self.streams
+            .with_partitions_mut(&stream_ident, &topic_ident, |partitions| {
+                let _ = partitions.insert(partition);
+            });
+
+        // Initialize the log with a segment
+        self.init_log(&stream_ident, &topic_ident, partition_id)
+            .await?;
+
+        info!(
+            "Lazy created partition: stream={}, topic={}, partition={} on 
shard {}",
+            stream_id, topic_id, partition_id, self.id
+        );
+
+        Ok(())
+    }
+}
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index b5424cd9f..bb2e0266e 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -25,6 +25,7 @@ pub mod transmission;
 
 mod communication;
 pub mod handlers;
+mod lazy_init;
 
 // Re-export for backwards compatibility
 pub use communication::calculate_shard_assignment;
@@ -33,7 +34,7 @@ use self::tasks::{continuous, periodic};
 use crate::{
     configs::server::ServerConfig,
     io::fs_locks::FsLocks,
-    metadata::SharedMetadata,
+    metadata::{SharedMetadata, SharedStatsStore},
     shard::{
         namespace::IggyNamespace, task_registry::TaskRegistry, 
transmission::frame::ShardFrame,
     },
@@ -77,6 +78,9 @@ pub struct IggyShard {
     /// Shared metadata accessible by all shards (ArcSwap-based).
     pub(crate) shared_metadata: EternalPtr<SharedMetadata>,
 
+    /// Shared stats store for cross-shard stats visibility.
+    pub(crate) shared_stats: EternalPtr<SharedStatsStore>,
+
     pub(crate) fs_locks: FsLocks,
     pub(crate) encryptor: Option<EncryptorKind>,
     pub(crate) config: ServerConfig,
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index 10177bab3..b65ba443d 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -35,6 +35,7 @@ use crate::streaming::topics;
 use err_trail::ErrContext;
 use iggy_common::Identifier;
 use iggy_common::IggyError;
+use std::sync::Arc;
 use tracing::info;
 
 impl IggyShard {
@@ -107,6 +108,14 @@ impl IggyShard {
 
         let shards_count = self.get_available_shards_count();
         for (partition_id, stats) in partitions.iter().map(|p| (p.id(), 
p.stats())) {
+            // Register partition stats in SharedStatsStore for cross-shard 
visibility
+            self.shared_stats.register_partition_stats(
+                numeric_stream_id,
+                numeric_topic_id,
+                partition_id,
+                Arc::clone(stats),
+            );
+
             let ns = IggyNamespace::new(numeric_stream_id, numeric_topic_id, 
partition_id);
             let shard_id = ShardId::new(calculate_shard_assignment(&ns, 
shards_count));
             let is_current_shard = self.id == *shard_id;
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index b12ca2f2f..4cb9bc041 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -45,8 +45,14 @@ impl IggyShard {
         let stream = stream::create_and_insert_stream_mem(&self.streams, 
name.clone());
         self.metrics.increment_streams(1);
 
+        // Register stats in SharedStatsStore for cross-shard visibility
+        self.shared_stats
+            .register_stream_stats(stream.id(), stream.stats().clone());
+
         // Dual-write: also update SharedMetadata for consistent cross-shard 
reads
-        let _ = self.shared_metadata.create_stream(name);
+        let _ = self
+            .shared_metadata
+            .add_stream(stream.id(), name, stream.root().created_at());
 
         create_stream_file_hierarchy(stream.id(), &self.config.system).await?;
         Ok(stream)
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index a5830c8fd..83985c279 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -26,6 +26,7 @@ use crate::streaming::{partitions, streams, topics};
 use err_trail::ErrContext;
 use iggy_common::{CompressionAlgorithm, Identifier, IggyError, IggyExpiry, 
MaxTopicSize};
 use std::str::FromStr;
+use std::sync::Arc;
 use tracing::info;
 
 impl IggyShard {
@@ -81,10 +82,19 @@ impl IggyShard {
         );
         self.metrics.increment_topics(1);
 
-        // Dual-write: also update SharedMetadata
-        let _ = self.shared_metadata.create_topic(
+        // Register stats in SharedStatsStore for cross-shard visibility
+        self.shared_stats.register_topic_stats(
+            numeric_stream_id,
+            topic.id(),
+            Arc::clone(topic.stats()),
+        );
+
+        // Dual-write: also update SharedMetadata with actual topic ID from 
slab
+        let _ = self.shared_metadata.add_topic(
             stream_id,
+            topic.id(),
             name,
+            topic.root().created_at(),
             replication_factor.unwrap_or(1),
             message_expiry,
             compression,
diff --git a/core/server/src/shard/tasks/periodic/message_saver.rs 
b/core/server/src/shard/tasks/periodic/message_saver.rs
index c6d0efdcf..25ab6b53c 100644
--- a/core/server/src/shard/tasks/periodic/message_saver.rs
+++ b/core/server/src/shard/tasks/periodic/message_saver.rs
@@ -54,6 +54,15 @@ async fn save_messages(shard: Rc<IggyShard>) -> Result<(), 
IggyError> {
         let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap();
         let partition_id = ns.partition_id();
 
+        // Skip partitions that haven't been lazily created yet
+        // (they have no messages to save anyway)
+        if !shard
+            .streams
+            .partition_exists(&stream_id, &topic_id, partition_id)
+        {
+            continue;
+        }
+
         match shard
             .streams
             .persist_messages(
@@ -102,6 +111,14 @@ async fn fsync_all_segments_on_shutdown(shard: 
Rc<IggyShard>, result: Result<(),
         let topic_id = Identifier::numeric(ns.topic_id() as u32).unwrap();
         let partition_id = ns.partition_id();
 
+        // Skip partitions that haven't been lazily created yet
+        if !shard
+            .streams
+            .partition_exists(&stream_id, &topic_id, partition_id)
+        {
+            continue;
+        }
+
         match shard
             .streams
             .fsync_all_messages(&stream_id, &topic_id, partition_id)
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 3364a55aa..9c362d9a7 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use iggy_common::{Identifier, TransportProtocol};
-use std::net::SocketAddr;
+use iggy_common::Identifier;
 use strum::Display;
 
 #[derive(Debug, Clone, Display)]
@@ -28,8 +27,4 @@ pub enum ShardEvent {
         partition_id: usize,
         fsync: bool,
     },
-    AddressBound {
-        protocol: TransportProtocol,
-        address: SocketAddr,
-    },
 }
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index 0ad5abbf0..a8b990ec6 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -42,6 +42,8 @@ pub type ContainerId = usize;
 
 #[derive(Debug)]
 pub struct Partitions {
+    /// Numeric ID index: logical partition ID → slab position
+    numeric_index: ahash::AHashMap<usize, ContainerId>,
     root: Slab<partition::PartitionRoot>,
     stats: Slab<Arc<PartitionStats>>,
     message_deduplicator: Slab<Option<Arc<MessageDeduplicator>>>,
@@ -60,6 +62,7 @@ pub struct Partitions {
 impl Clone for Partitions {
     fn clone(&self) -> Self {
         Self {
+            numeric_index: self.numeric_index.clone(),
             root: self.root.clone(),
             stats: self.stats.clone(),
             message_deduplicator: self.message_deduplicator.clone(),
@@ -79,40 +82,51 @@ impl Insert for Partitions {
         let (root, stats, deduplicator, offset, consumer_offset, 
consumer_group_offset, log) =
             item.into_components();
 
-        let entity_id = self.root.insert(root);
-        let id = self.stats.insert(stats);
+        // Check if ID was pre-set (e.g., from metadata during lazy creation)
+        let pre_set_id = root.id();
+
+        let slab_pos = self.root.insert(root);
+        let stats_pos = self.stats.insert(stats);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating stats"
+            slab_pos, stats_pos,
+            "partition_insert: position mismatch when creating stats"
         );
-        let id = self.log.insert(log);
+        let log_pos = self.log.insert(log);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating log"
+            slab_pos, log_pos,
+            "partition_insert: position mismatch when creating log"
         );
-        let id = self.message_deduplicator.insert(deduplicator);
+        let dedup_pos = self.message_deduplicator.insert(deduplicator);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating message_deduplicator"
+            slab_pos, dedup_pos,
+            "partition_insert: position mismatch when creating 
message_deduplicator"
         );
-        let id = self.offset.insert(offset);
+        let offset_pos = self.offset.insert(offset);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating offset"
+            slab_pos, offset_pos,
+            "partition_insert: position mismatch when creating offset"
         );
-        let id = self.consumer_offset.insert(consumer_offset);
+        let consumer_pos = self.consumer_offset.insert(consumer_offset);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating consumer_offset"
+            slab_pos, consumer_pos,
+            "partition_insert: position mismatch when creating consumer_offset"
         );
-        let id = self.consumer_group_offset.insert(consumer_group_offset);
+        let cg_pos = self.consumer_group_offset.insert(consumer_group_offset);
         assert_eq!(
-            entity_id, id,
-            "partition_insert: id mismatch when creating consumer_group_offset"
+            slab_pos, cg_pos,
+            "partition_insert: position mismatch when creating 
consumer_group_offset"
         );
-        let root = self.root.get_mut(entity_id).unwrap();
-        root.update_id(entity_id);
-        entity_id
+
+        let root = self.root.get_mut(slab_pos).unwrap();
+
+        // Use pre-set ID if available, otherwise use slab position
+        let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos };
+        root.update_id(logical_id);
+
+        // Update numeric index
+        self.numeric_index.insert(logical_id, slab_pos);
+
+        logical_id
     }
 }
 
@@ -120,14 +134,25 @@ impl Delete for Partitions {
     type Idx = ContainerId;
     type Item = Partition;
 
-    fn delete(&mut self, id: Self::Idx) -> Self::Item {
-        let root = self.root.remove(id);
-        let stats = self.stats.remove(id);
-        let message_deduplicator = self.message_deduplicator.remove(id);
-        let offset = self.offset.remove(id);
-        let consumer_offset = self.consumer_offset.remove(id);
-        let consumer_group_offset = self.consumer_group_offset.remove(id);
-        let log = self.log.remove(id);
+    fn delete(&mut self, logical_id: Self::Idx) -> Self::Item {
+        // Look up slab position from logical ID
+        let slab_pos = *self
+            .numeric_index
+            .get(&logical_id)
+            .expect("partition_delete: logical ID not found in numeric index");
+
+        let root = self.root.remove(slab_pos);
+        let stats = self.stats.remove(slab_pos);
+        let message_deduplicator = self.message_deduplicator.remove(slab_pos);
+        let offset = self.offset.remove(slab_pos);
+        let consumer_offset = self.consumer_offset.remove(slab_pos);
+        let consumer_group_offset = 
self.consumer_group_offset.remove(slab_pos);
+        let log = self.log.remove(slab_pos);
+
+        // Remove from numeric index
+        self.numeric_index
+            .remove(&logical_id)
+            .expect("partition_delete: ID not found in numeric index");
 
         Partition::new_with_components(
             root,
@@ -197,6 +222,7 @@ impl EntityComponentSystemMut for Partitions {
 impl Default for Partitions {
     fn default() -> Self {
         Self {
+            numeric_index: ahash::AHashMap::with_capacity(PARTITIONS_CAPACITY),
             root: Slab::with_capacity(PARTITIONS_CAPACITY),
             stats: Slab::with_capacity(PARTITIONS_CAPACITY),
             log: Slab::with_capacity(PARTITIONS_CAPACITY),
@@ -223,21 +249,29 @@ impl Partitions {
 
     pub fn with_partition_by_id<T>(
         &self,
-        id: ContainerId,
+        logical_id: ContainerId,
         f: impl FnOnce(ComponentsById<PartitionRef>) -> T,
     ) -> T {
-        self.with_components_by_id(id, |components| f(components))
+        let slab_pos = *self
+            .numeric_index
+            .get(&logical_id)
+            .expect("Partition not found by ID");
+        self.with_components_by_id(slab_pos, |components| f(components))
     }
 
     pub fn exists(&self, id: ContainerId) -> bool {
-        self.root.contains(id)
+        self.numeric_index.contains_key(&id)
     }
 
     pub fn with_partition_by_id_mut<T>(
         &mut self,
-        id: ContainerId,
+        logical_id: ContainerId,
         f: impl FnOnce(ComponentsById<PartitionRefMut>) -> T,
     ) -> T {
-        self.with_components_by_id_mut(id, |components| f(components))
+        let slab_pos = *self
+            .numeric_index
+            .get(&logical_id)
+            .expect("Partition not found by ID");
+        self.with_components_by_id_mut(slab_pos, |components| f(components))
     }
 }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index ad7d5368d..d82b95ebb 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -71,7 +71,11 @@ pub type ContainerId = usize;
 
 #[derive(Debug, Clone)]
 pub struct Streams {
+    /// Name index: stream name → slab position
     index: RefCell<AHashMap<<stream::StreamRoot as Keyed>::Key, ContainerId>>,
+    /// Numeric ID index: logical stream ID → slab position
+    /// This allows logical IDs to differ from slab positions (needed for lazy 
creation)
+    numeric_index: RefCell<AHashMap<usize, ContainerId>>,
     root: RefCell<Slab<stream::StreamRoot>>,
     stats: RefCell<Slab<Arc<StreamStats>>>,
 }
@@ -80,6 +84,7 @@ impl Default for Streams {
     fn default() -> Self {
         Self {
             index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
+            numeric_index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
             root: RefCell::new(Slab::with_capacity(CAPACITY)),
             stats: RefCell::new(Slab::with_capacity(CAPACITY)),
         }
@@ -109,20 +114,32 @@ impl InsertCell for Streams {
     fn insert(&self, item: Self::Item) -> Self::Idx {
         let (root, stats) = item.into_components();
         let mut root_container = self.root.borrow_mut();
-        let mut indexes = self.index.borrow_mut();
+        let mut name_index = self.index.borrow_mut();
+        let mut numeric_index = self.numeric_index.borrow_mut();
         let mut stats_container = self.stats.borrow_mut();
 
+        // Check if ID was pre-set (e.g., from metadata during lazy creation)
+        let pre_set_id = root.id();
+
         let key = root.key().clone();
-        let entity_id = root_container.insert(root);
-        let id = stats_container.insert(stats);
+        let slab_pos = root_container.insert(root);
+        let stats_pos = stats_container.insert(stats);
         assert_eq!(
-            entity_id, id,
-            "stream_insert: id mismatch when inserting stats"
+            slab_pos, stats_pos,
+            "stream_insert: position mismatch when inserting stats"
         );
-        let root = root_container.get_mut(entity_id).unwrap();
-        root.update_id(entity_id);
-        indexes.insert(key, entity_id);
-        entity_id
+
+        let root = root_container.get_mut(slab_pos).unwrap();
+
+        // Use pre-set ID if available, otherwise use slab position
+        let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos };
+        root.update_id(logical_id);
+
+        // Update both indexes
+        name_index.insert(key, slab_pos);
+        numeric_index.insert(logical_id, slab_pos);
+
+        logical_id
     }
 }
 
@@ -130,19 +147,28 @@ impl DeleteCell for Streams {
     type Idx = ContainerId;
     type Item = stream::Stream;
 
-    fn delete(&self, id: Self::Idx) -> Self::Item {
+    fn delete(&self, logical_id: Self::Idx) -> Self::Item {
         let mut root_container = self.root.borrow_mut();
-        let mut indexes = self.index.borrow_mut();
+        let mut name_index = self.index.borrow_mut();
+        let mut numeric_index = self.numeric_index.borrow_mut();
         let mut stats_container = self.stats.borrow_mut();
 
-        let root = root_container.remove(id);
-        let stats = stats_container.remove(id);
+        // Look up slab position from logical ID
+        let slab_pos = *numeric_index
+            .get(&logical_id)
+            .expect("stream_delete: logical ID not found in numeric index");
 
-        // Remove from index
+        let root = root_container.remove(slab_pos);
+        let stats = stats_container.remove(slab_pos);
+
+        // Remove from both indexes
         let key = root.key();
-        indexes
+        name_index
             .remove(key)
-            .expect("stream_delete: key not found in index");
+            .expect("stream_delete: key not found in name index");
+        numeric_index
+            .remove(&logical_id)
+            .expect("stream_delete: ID not found in numeric index");
 
         stream::Stream::new_with_components(root, stats)
     }
@@ -441,8 +467,8 @@ impl Streams {
     pub fn exists(&self, id: &Identifier) -> bool {
         match id.kind {
             iggy_common::IdKind::Numeric => {
-                let id = id.get_u32_value().unwrap() as usize;
-                self.root.borrow().contains(id)
+                let logical_id = id.get_u32_value().unwrap() as usize;
+                self.numeric_index.borrow().contains_key(&logical_id)
             }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
@@ -451,12 +477,45 @@ impl Streams {
         }
     }
 
+    /// Check if a partition exists locally (stream, topic, and partition all 
exist).
+    /// Returns false if any level doesn't exist, without panicking.
+    pub fn partition_exists(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+    ) -> bool {
+        if !self.exists(stream_id) {
+            return false;
+        }
+
+        let topic_exists = self.with_topics(stream_id, |topics| 
topics.exists(topic_id));
+        if !topic_exists {
+            return false;
+        }
+
+        self.with_partitions(stream_id, topic_id, |partitions| {
+            partitions.exists(partition_id)
+        })
+    }
+
     pub fn get_index(&self, id: &Identifier) -> usize {
         match id.kind {
-            iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as 
usize,
+            iggy_common::IdKind::Numeric => {
+                let logical_id = id.get_u32_value().unwrap() as usize;
+                *self
+                    .numeric_index
+                    .borrow()
+                    .get(&logical_id)
+                    .expect("Stream not found by numeric ID")
+            }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
-                *self.index.borrow().get(&key).expect("Stream not found")
+                *self
+                    .index
+                    .borrow()
+                    .get(&key)
+                    .expect("Stream not found by name")
             }
         }
     }
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index 7bfed97e7..39cda6946 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -45,7 +45,10 @@ pub type ContainerId = usize;
 
 #[derive(Debug, Clone)]
 pub struct Topics {
+    /// Name index: topic name → slab position
     index: RefCell<AHashMap<<topic::TopicRoot as Keyed>::Key, ContainerId>>,
+    /// Numeric ID index: logical topic ID → slab position
+    numeric_index: RefCell<AHashMap<usize, ContainerId>>,
     root: RefCell<Slab<topic::TopicRoot>>,
     auxilaries: RefCell<Slab<topic::TopicAuxilary>>,
     stats: RefCell<Slab<Arc<TopicStats>>>,
@@ -59,25 +62,37 @@ impl InsertCell for Topics {
         let (root, auxilary, stats) = item.into_components();
         let mut root_container = self.root.borrow_mut();
         let mut auxilaries = self.auxilaries.borrow_mut();
-        let mut indexes = self.index.borrow_mut();
+        let mut name_index = self.index.borrow_mut();
+        let mut numeric_index = self.numeric_index.borrow_mut();
         let mut stats_container = self.stats.borrow_mut();
 
+        // Check if ID was pre-set (e.g., from metadata during lazy creation)
+        let pre_set_id = root.id();
+
         let key = root.key().clone();
-        let entity_id = root_container.insert(root);
-        let id = stats_container.insert(stats);
+        let slab_pos = root_container.insert(root);
+        let stats_pos = stats_container.insert(stats);
         assert_eq!(
-            entity_id, id,
-            "topic_insert: id mismatch when inserting stats component"
+            slab_pos, stats_pos,
+            "topic_insert: position mismatch when inserting stats component"
         );
-        let id = auxilaries.insert(auxilary);
+        let aux_pos = auxilaries.insert(auxilary);
         assert_eq!(
-            entity_id, id,
-            "topic_insert: id mismatch when inserting auxilary component"
+            slab_pos, aux_pos,
+            "topic_insert: position mismatch when inserting auxilary component"
         );
-        let root = root_container.get_mut(entity_id).unwrap();
-        root.update_id(entity_id);
-        indexes.insert(key, entity_id);
-        entity_id
+
+        let root = root_container.get_mut(slab_pos).unwrap();
+
+        // Use pre-set ID if available, otherwise use slab position
+        let logical_id = if pre_set_id > 0 { pre_set_id } else { slab_pos };
+        root.update_id(logical_id);
+
+        // Update both indexes
+        name_index.insert(key, slab_pos);
+        numeric_index.insert(logical_id, slab_pos);
+
+        logical_id
     }
 }
 
@@ -85,24 +100,33 @@ impl DeleteCell for Topics {
     type Idx = ContainerId;
     type Item = topic::Topic;
 
-    fn delete(&self, id: Self::Idx) -> Self::Item {
+    fn delete(&self, logical_id: Self::Idx) -> Self::Item {
         let mut root_container = self.root.borrow_mut();
         let mut auxilaries = self.auxilaries.borrow_mut();
-        let mut indexes = self.index.borrow_mut();
+        let mut name_index = self.index.borrow_mut();
+        let mut numeric_index = self.numeric_index.borrow_mut();
         let mut stats_container = self.stats.borrow_mut();
 
-        let root = root_container.remove(id);
-        let auxilary = auxilaries.remove(id);
-        let stats = stats_container.remove(id);
+        // Look up slab position from logical ID
+        let slab_pos = *numeric_index
+            .get(&logical_id)
+            .expect("topic_delete: logical ID not found in numeric index");
+
+        let root = root_container.remove(slab_pos);
+        let auxilary = auxilaries.remove(slab_pos);
+        let stats = stats_container.remove(slab_pos);
 
-        // Remove from index
+        // Remove from both indexes
         let key = root.key();
-        indexes.remove(key).unwrap_or_else(|| {
+        name_index.remove(key).unwrap_or_else(|| {
             panic!(
                 "topic_delete: key not found with key: {} and id: {}",
-                key, id
+                key, logical_id
             )
         });
+        numeric_index
+            .remove(&logical_id)
+            .expect("topic_delete: ID not found in numeric index");
 
         topic::Topic::new_with_components(root, auxilary, stats)
     }
@@ -130,6 +154,7 @@ impl Default for Topics {
     fn default() -> Self {
         Self {
             index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
+            numeric_index: RefCell::new(AHashMap::with_capacity(CAPACITY)),
             root: RefCell::new(Slab::with_capacity(CAPACITY)),
             auxilaries: RefCell::new(Slab::with_capacity(CAPACITY)),
             stats: RefCell::new(Slab::with_capacity(CAPACITY)),
@@ -173,8 +198,8 @@ impl Topics {
     pub fn exists(&self, id: &Identifier) -> bool {
         match id.kind {
             iggy_common::IdKind::Numeric => {
-                let id = id.get_u32_value().unwrap() as usize;
-                self.root.borrow().contains(id)
+                let logical_id = id.get_u32_value().unwrap() as usize;
+                self.numeric_index.borrow().contains_key(&logical_id)
             }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
@@ -185,10 +210,21 @@ impl Topics {
 
     pub fn get_index(&self, id: &Identifier) -> usize {
         match id.kind {
-            iggy_common::IdKind::Numeric => id.get_u32_value().unwrap() as 
usize,
+            iggy_common::IdKind::Numeric => {
+                let logical_id = id.get_u32_value().unwrap() as usize;
+                *self
+                    .numeric_index
+                    .borrow()
+                    .get(&logical_id)
+                    .expect("Topic not found by numeric ID")
+            }
             iggy_common::IdKind::String => {
                 let key = id.get_string_value().unwrap();
-                *self.index.borrow().get(&key).expect("Topic not found")
+                *self
+                    .index
+                    .borrow()
+                    .get(&key)
+                    .expect("Topic not found by name")
             }
         }
     }
diff --git a/core/server/src/streaming/partitions/partition.rs 
b/core/server/src/streaming/partitions/partition.rs
index 7a07b8f33..bc06416d4 100644
--- a/core/server/src/streaming/partitions/partition.rs
+++ b/core/server/src/streaming/partitions/partition.rs
@@ -157,7 +157,7 @@ impl Partition {
         }
     }
 
-    pub fn stats(&self) -> &PartitionStats {
+    pub fn stats(&self) -> &Arc<PartitionStats> {
         &self.stats
     }
 }
diff --git a/core/server/src/streaming/topics/topic.rs 
b/core/server/src/streaming/topics/topic.rs
index 348dc3c8a..5a3aa1ee9 100644
--- a/core/server/src/streaming/topics/topic.rs
+++ b/core/server/src/streaming/topics/topic.rs
@@ -121,7 +121,7 @@ impl Topic {
         &mut self.root
     }
 
-    pub fn stats(&self) -> &TopicStats {
+    pub fn stats(&self) -> &Arc<TopicStats> {
         &self.stats
     }
 }
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index c68547301..82b539f9d 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -20,7 +20,6 @@ use crate::configs::tcp::TcpSocketConfig;
 
 use crate::shard::IggyShard;
 use crate::shard::task_registry::{ShutdownToken, TaskRegistry};
-use crate::shard::transmission::event::ShardEvent;
 use crate::tcp::connection_handler::{ConnectionAction, handle_connection, 
handle_error};
 use compio::net::{TcpListener, TcpOpts};
 use err_trail::ErrContext;
@@ -72,7 +71,7 @@ pub async fn start(
     if shard.id != 0 && addr.port() == 0 {
         info!("Waiting for TCP address from shard 0...");
         loop {
-            if let Some(bound_addr) = shard.tcp_bound_address.get() {
+            if let Some(bound_addr) = 
shard.shared_metadata.load().bound_addresses.tcp {
                 addr = bound_addr;
                 info!("Received TCP address: {}", addr);
                 break;
@@ -94,19 +93,14 @@ pub async fn start(
     info!("{} server has started on: {:?}", server_name, actual_addr);
 
     if shard.id == 0 {
-        // Store bound address locally
+        // Store in Cell for config_writer backward compat
         shard.tcp_bound_address.set(Some(actual_addr));
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_tcp_address(actual_addr);
 
         if addr.port() == 0 {
             // Notify config writer on shard 0
             let _ = shard.config_writer_notify.try_send(());
-
-            // Broadcast to other shards for SO_REUSEPORT binding
-            let event = ShardEvent::AddressBound {
-                protocol: TransportProtocol::Tcp,
-                address: actual_addr,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
         }
     }
 
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 50a8c1d19..ef257b05b 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -19,7 +19,6 @@
 use crate::configs::tcp::TcpSocketConfig;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
-use crate::shard::transmission::event::ShardEvent;
 use crate::tcp::connection_handler::{handle_connection, handle_error};
 use compio::net::{TcpListener, TcpOpts};
 use compio_tls::TlsAcceptor;
@@ -46,7 +45,7 @@ pub(crate) async fn start(
     if shard.id != 0 && addr.port() == 0 {
         info!("Waiting for TCP address from shard 0...");
         loop {
-            if let Some(bound_addr) = shard.tcp_bound_address.get() {
+            if let Some(bound_addr) = 
shard.shared_metadata.load().bound_addresses.tcp {
                 addr = bound_addr;
                 info!("Received TCP address: {}", addr);
                 break;
@@ -68,16 +67,14 @@ pub(crate) async fn start(
     })?;
 
     if shard.id == 0 {
+        // Store in Cell for config_writer backward compat
         shard.tcp_bound_address.set(Some(actual_addr));
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_tcp_address(actual_addr);
+
         if addr.port() == 0 {
             // Notify config writer on shard 0
             let _ = shard.config_writer_notify.try_send(());
-
-            let event = ShardEvent::AddressBound {
-                protocol: TransportProtocol::Tcp,
-                address: actual_addr,
-            };
-            shard.broadcast_event_to_all_shards(event).await?;
         }
     }
 
diff --git a/core/server/src/websocket/websocket_listener.rs 
b/core/server/src/websocket/websocket_listener.rs
index 7b691c61b..d17273cb6 100644
--- a/core/server/src/websocket/websocket_listener.rs
+++ b/core/server/src/websocket/websocket_listener.rs
@@ -19,7 +19,6 @@
 use crate::configs::websocket::WebSocketConfig;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
-use crate::shard::transmission::event::ShardEvent;
 use crate::websocket::connection_handler::{handle_connection, handle_error};
 use compio::net::TcpListener;
 use compio_net::TcpOpts;
@@ -58,7 +57,7 @@ pub async fn start(
     if shard.id != 0 && addr.port() == 0 {
         info!("Waiting for WebSocket address from shard 0...");
         loop {
-            if let Some(bound_addr) = shard.websocket_bound_address.get() {
+            if let Some(bound_addr) = 
shard.shared_metadata.load().bound_addresses.websocket {
                 addr = bound_addr;
                 info!("Received WebSocket address from shard 0: {}", addr);
                 break;
@@ -78,25 +77,16 @@ pub async fn start(
     let local_addr = listener.local_addr().unwrap();
     info!("{} has started on: ws://{}", "WebSocket Server", local_addr);
 
-    // Notify shard about the bound address
-    let event = ShardEvent::AddressBound {
-        protocol: TransportProtocol::WebSocket,
-        address: local_addr,
-    };
-
     if shard.id == 0 {
-        // Store bound address locally first
+        // Store in Cell for config_writer backward compat
         shard.websocket_bound_address.set(Some(local_addr));
-
-        if addr.port() == 0 {
-            // Broadcast to other shards for SO_REUSEPORT binding
-            shard.broadcast_event_to_all_shards(event).await?;
-        }
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_websocket_address(local_addr);
+        // Notify config_writer that WebSocket is bound
+        let _ = shard.config_writer_notify.try_send(());
     } else {
-        // Non-shard0 just handles the event locally
-        crate::shard::handlers::handle_event(&shard, event)
-            .await
-            .ok();
+        // Non-shard-0 just updates local Cell
+        shard.websocket_bound_address.set(Some(local_addr));
     }
 
     let ws_config = config.to_tungstenite_config();
diff --git a/core/server/src/websocket/websocket_tls_listener.rs 
b/core/server/src/websocket/websocket_tls_listener.rs
index 3743162fb..45c080da1 100644
--- a/core/server/src/websocket/websocket_tls_listener.rs
+++ b/core/server/src/websocket/websocket_tls_listener.rs
@@ -19,7 +19,6 @@
 use crate::configs::websocket::WebSocketConfig;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
-use crate::shard::transmission::event::ShardEvent;
 use crate::websocket::connection_handler::{handle_connection, handle_error};
 use compio::net::TcpListener;
 use compio_net::TcpOpts;
@@ -63,7 +62,7 @@ pub async fn start(
     if shard.id != 0 && addr.port() == 0 {
         info!("Waiting for WebSocket TLS address from shard 0...");
         loop {
-            if let Some(bound_addr) = shard.websocket_bound_address.get() {
+            if let Some(bound_addr) = 
shard.shared_metadata.load().bound_addresses.websocket {
                 addr = bound_addr;
                 info!("Received WebSocket TLS address from shard 0: {}", addr);
                 break;
@@ -81,25 +80,16 @@ pub async fn start(
 
     let local_addr = listener.local_addr().unwrap();
 
-    // Notify shard about the bound address
-    let event = ShardEvent::AddressBound {
-        protocol: TransportProtocol::WebSocket,
-        address: local_addr,
-    };
-
     if shard.id == 0 {
-        // Store bound address locally first
+        // Store in Cell for config_writer backward compat
         shard.websocket_bound_address.set(Some(local_addr));
-
-        if addr.port() == 0 {
-            // Broadcast to other shards for SO_REUSEPORT binding
-            shard.broadcast_event_to_all_shards(event).await?;
-        }
+        // Store in SharedMetadata (all shards see this immediately via 
ArcSwap)
+        shard.shared_metadata.set_websocket_address(local_addr);
+        // Notify config_writer that WebSocket is bound
+        let _ = shard.config_writer_notify.try_send(());
     } else {
-        // Non-shard0 just handles the event locally
-        crate::shard::handlers::handle_event(&shard, event)
-            .await
-            .ok();
+        // Non-shard-0 just updates local Cell
+        shard.websocket_bound_address.set(Some(local_addr));
     }
 
     // Ensure rustls crypto provider is installed

Reply via email to