This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_broadcast
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc_broadcast by this 
push:
     new 1983a9f78 add response for ShardEvent
1983a9f78 is described below

commit 1983a9f783726646813c307609a9de3492cc97d8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Oct 16 13:20:00 2025 +0200

    add response for ShardEvent
---
 .../create_consumer_group_handler.rs               |  15 ++-
 .../delete_consumer_group_handler.rs               |  15 ++-
 .../consumer_groups/join_consumer_group_handler.rs |  15 ++-
 .../leave_consumer_group_handler.rs                |  15 ++-
 .../partitions/create_partitions_handler.rs        |  15 ++-
 .../partitions/delete_partitions_handler.rs        |  15 ++-
 .../create_personal_access_token_handler.rs        |  15 ++-
 .../delete_personal_access_token_handler.rs        |  16 ++-
 .../login_with_personal_access_token_handler.rs    |  15 ++-
 .../handlers/segments/delete_segments_handler.rs   |  15 ++-
 .../handlers/streams/create_stream_handler.rs      |  23 +++-
 .../handlers/streams/delete_stream_handler.rs      |  15 ++-
 .../handlers/streams/purge_stream_handler.rs       |  15 ++-
 .../handlers/streams/update_stream_handler.rs      |  15 ++-
 .../binary/handlers/topics/create_topic_handler.rs |  43 ++++++-
 .../binary/handlers/topics/delete_topic_handler.rs |  15 ++-
 .../binary/handlers/topics/purge_topic_handler.rs  |  16 ++-
 .../binary/handlers/topics/update_topic_handler.rs |  15 ++-
 .../handlers/users/change_password_handler.rs      |  15 ++-
 .../binary/handlers/users/create_user_handler.rs   |  15 ++-
 .../binary/handlers/users/delete_user_handler.rs   |  15 ++-
 .../binary/handlers/users/login_user_handler.rs    |  15 ++-
 .../binary/handlers/users/logout_user_handler.rs   |  15 ++-
 .../handlers/users/update_permissions_handler.rs   |  15 ++-
 .../binary/handlers/users/update_user_handler.rs   |  15 ++-
 core/server/src/shard/mod.rs                       | 140 +++++++++++++--------
 core/server/src/tcp/tcp_listener.rs                |  42 ++++++-
 27 files changed, 515 insertions(+), 80 deletions(-)

diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index eb7e80190..b4577b333 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreateConsumerGroup {
             topic_id: self.topic_id.clone(),
             cg,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 5dfb9dda5..88fad3802 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -87,7 +88,19 @@ impl ServerCommandHandler for DeleteConsumerGroup {
             topic_id: self.topic_id.clone(),
             group_id: self.group_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         shard
diff --git 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index bca8742f4..c972d5dec 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
@@ -69,7 +70,19 @@ impl ServerCommandHandler for JoinConsumerGroup {
             topic_id,
             group_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 8e7955019..53a2a0f1a 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use super::COMPONENT;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
@@ -71,7 +72,19 @@ impl ServerCommandHandler for LeaveConsumerGroup {
             topic_id,
             group_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index fe05c200d..ea1b8beec 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreatePartitions {
             topic_id: self.topic_id.clone(),
             partitions,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard.streams2.with_topic_by_id_mut(
             &self.stream_id,
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 9a0451651..9d354371b 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for DeletePartitions {
             partitions_count: self.partitions_count,
             partition_ids: deleted_partition_ids,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         let remaining_partition_ids = shard.streams2.with_topic_by_id(
             &self.stream_id,
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index ea7b947a4..7fd38b7e7 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         let event = ShardEvent::CreatedPersonalAccessToken {
             personal_access_token: personal_access_token.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index b2708f9c4..9aa424c7c 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -19,7 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
-use crate::shard::IggyShard;
+use crate::shard::{BroadcastResult, IggyShard};
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -56,7 +56,19 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
             user_id: session.get_user_id(),
             name: self.name.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index e7ecc33f9..f21111bb3 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -1,3 +1,4 @@
+use crate::shard::BroadcastResult;
 use crate::shard::transmission::event::ShardEvent;
 /* Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -60,7 +61,19 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
             token: self.token,
             client_id: session.client_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ccb5f859d..d84d56aac 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -68,7 +69,19 @@ impl ServerCommandHandler for DeleteSegments {
             partition_id: self.partition_id as usize,
             segments_count: self.segments_count,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0b1c1ec0a..54d46af50 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -60,7 +61,27 @@ impl ServerCommandHandler for CreateStream {
             id: created_stream_id,
             stream,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        // Broadcast the event and handle potential failures
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {
+                // All shards successfully received the event
+            }
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                // Some shards failed, but we can continue as at least some 
succeeded
+                for (shard_id, error) in errors {
+                    tracing::warn!(
+                        "Shard {} failed to process CreatedStream2 event: 
{:?}",
+                        shard_id,
+                        error
+                    );
+                }
+            }
+            BroadcastResult::Failure(_err) => {
+                // All shards failed - this is critical for stream creation
+                return Err(IggyError::ShardCommunicationError(0)); // 0 
indicates all shards failed
+            }
+        }
 
         let response = shard
             .streams2
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index f59b11b5b..de840fe8a 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -65,7 +66,19 @@ impl ServerCommandHandler for DeleteStream {
             id: stream2.id(),
             stream_id: self.stream_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index b8b35b645..f82e8148f 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -54,7 +55,19 @@ impl ServerCommandHandler for PurgeStream {
         let event = ShardEvent::PurgedStream2 {
             stream_id: self.stream_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 996db7f4b..b1bf298e1 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -55,7 +56,19 @@ impl ServerCommandHandler for UpdateStream {
             stream_id: self.stream_id.clone(),
             name: self.name.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 35eba8d93..8456f190e 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -71,7 +72,26 @@ impl ServerCommandHandler for CreateTopic {
             stream_id: self.stream_id.clone(),
             topic,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {
+                // All shards successfully received the event
+            }
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                // Some shards failed, but we can continue
+                for (shard_id, error) in errors {
+                    tracing::warn!(
+                        "Shard {} failed to process CreatedTopic2 event: {:?}",
+                        shard_id,
+                        error
+                    );
+                }
+            }
+            BroadcastResult::Failure(_) => {
+                // All shards failed - critical for topic creation
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let partitions = shard
             .create_partitions2(
                 session,
@@ -85,7 +105,26 @@ impl ServerCommandHandler for CreateTopic {
             topic_id: Identifier::numeric(topic_id as u32).unwrap(),
             partitions,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {
+                // All shards successfully received the event
+            }
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                // Some shards failed, but we can continue
+                for (shard_id, error) in errors {
+                    tracing::warn!(
+                        "Shard {} failed to process CreatedPartitions2 event: 
{:?}",
+                        shard_id,
+                        error
+                    );
+                }
+            }
+            BroadcastResult::Failure(_) => {
+                // All shards failed - critical for partitions creation
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let response = shard.streams2.with_topic_by_id(
             &self.stream_id,
             &Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 69d9d9957..5c294db77 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -71,7 +72,19 @@ impl ServerCommandHandler for DeleteTopic {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 7c4d5179c..ae5de3cf3 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -19,7 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
-use crate::shard::IggyShard;
+use crate::shard::{BroadcastResult, IggyShard};
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -60,7 +60,19 @@ impl ServerCommandHandler for PurgeTopic {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 8f9146ac9..af1222f8f 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -81,7 +82,19 @@ impl ServerCommandHandler for UpdateTopic {
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let topic_id = self.topic_id.clone();
         let stream_id = self.stream_id.clone();
 
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0b595ee4b..911d1ce2c 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -77,7 +78,19 @@ impl ServerCommandHandler for ChangePassword {
             current_password: self.current_password.clone(),
             new_password: self.new_password.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         // For the security of the system, we hash the password before storing 
it in metadata.
         shard
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 96521d5ff..87b1aae6c 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::{shard_debug, shard_info};
@@ -78,7 +79,19 @@ impl ServerCommandHandler for CreateUser {
             status: self.status,
             permissions: self.permissions.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         let user_id = user.id;
         let response = mapper::map_user(&user);
 
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index f265f8df0..d22cdf920 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -66,7 +67,19 @@ impl ServerCommandHandler for DeleteUser {
         let event = ShardEvent::DeletedUser {
             user_id: self.user_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         let user_id = self.user_id.clone();
         shard
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 395f2724b..bb4c53509 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -70,7 +71,19 @@ impl ServerCommandHandler for LoginUser {
             password,
         };
         // Broadcast the event to all shards.
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 7dbc99b4d..46ba8952c 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for LogoutUser {
         let event = ShardEvent::LogoutUser {
             client_id: session.client_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 4b42d9026..5346aa605 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for UpdatePermissions {
             user_id: self.user_id.clone(),
             permissions: self.permissions.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c8f28db38..2bfd87114 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -73,7 +74,19 @@ impl ServerCommandHandler for UpdateUser {
             username: self.username.clone(),
             status: self.status,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        match shard.broadcast_event_to_all_shards(event).await {
+            BroadcastResult::Success(_) => {}
+
+            BroadcastResult::PartialSuccess { errors, .. } => {
+                for (shard_id, error) in errors {
+                    tracing::warn!("Shard {} failed to process event: {:?}", 
shard_id, error);
+                }
+            }
+
+            BroadcastResult::Failure(_) => {
+                return Err(IggyError::ShardCommunicationError(0));
+            }
+        }
 
         let user_id = self.user_id.clone();
         shard
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7a2529fab..24de07a98 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -36,7 +36,7 @@ use crate::{
             message::{ShardMessage, ShardRequest, ShardRequestPayload, 
ShardSendRequestResult},
         },
     },
-    shard_error, shard_info,
+    shard_error, shard_info, shard_warn,
     slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
     state::file::FileState,
     streaming::{
@@ -49,9 +49,11 @@ use builder::IggyShardBuilder;
 use compio::io::AsyncWriteAtExt;
 use dashmap::DashMap;
 use error_set::ErrContext;
+use futures::future::join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
 use std::hash::Hasher as _;
+use std::sync::Arc;
 use std::{
     cell::{Cell, RefCell},
     net::SocketAddr,
@@ -64,6 +66,30 @@ use transmission::connector::{Receiver, ShardConnector, 
StopReceiver};
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(5);
+
+/// Result type for broadcast operations
+#[derive(Debug)]
+pub enum BroadcastError {
+    PartialFailure {
+        succeeded: Vec<u16>,
+        failed: Vec<(u16, IggyError)>,
+    },
+    TotalFailure(IggyError),
+    Timeout {
+        responded: Vec<u16>,
+        timed_out: Vec<u16>,
+    },
+}
+
+pub enum BroadcastResult {
+    Success(Vec<ShardResponse>),
+    PartialSuccess {
+        responses: Vec<ShardResponse>,
+        errors: Vec<(u16, IggyError)>,
+    },
+    Failure(BroadcastError),
+}
 
 pub(crate) struct Shard {
     id: u16,
@@ -899,59 +925,75 @@ impl IggyShard {
         }
     }
 
-    pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) -> 
Vec<ShardResponse> {
-        let mut responses = 
Vec::with_capacity(self.get_available_shards_count() as usize);
-        for maybe_receiver in self
-            .shards
-            .iter()
-            .filter_map(|shard| {
-                if shard.id != self.id {
-                    Some(shard.connection.clone())
-                } else {
-                    None
-                }
-            })
-            .map(|conn| {
-                // TODO: Fixme, maybe we should send response_sender
-                // and propagate errors back.
-                let event = event.clone();
-                /*
-                if matches!(
-                    &event,
-                    ShardEvent::CreatedStream2 { .. }
-                        | ShardEvent::DeletedStream2 { .. }
-                        | ShardEvent::CreatedTopic2 { .. }
-                        | ShardEvent::DeletedTopic2 { .. }
-                        | ShardEvent::UpdatedTopic2 { .. }
-                        | ShardEvent::CreatedPartitions2 { .. }
-                        | ShardEvent::DeletedPartitions2 { .. }
-                        | ShardEvent::CreatedConsumerGroup2 { .. }
-                        | ShardEvent::CreatedPersonalAccessToken { .. }
-                        | ShardEvent::DeletedConsumerGroup2 { .. }
-                ) {
-                */
+    pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) -> 
BroadcastResult {
+        // Use Arc to avoid cloning the event for each shard
+        let event = Arc::new(event);
+        let timeout_duration = BROADCAST_TIMEOUT;
+
+        // Create futures for all shards in parallel
+        let mut futures = Vec::new();
+        let mut shard_ids = Vec::new();
+
+        for shard in self.shards.iter().filter(|s| s.id != self.id) {
+            let event_ref = Arc::clone(&event);
+            let conn = shard.connection.clone();
+            let shard_id = shard.id;
+            shard_ids.push(shard_id);
+
+            let future = async move {
                 let (sender, receiver) = async_channel::bounded(1);
-                conn.send(ShardFrame::new(event.into(), Some(sender.clone())));
-                Some(receiver.clone())
-                /*
-                } else {
-                    conn.send(ShardFrame::new(event.into(), None));
-                    None
+                conn.send(ShardFrame::new(
+                    ShardMessage::Event((*event_ref).clone()),
+                    Some(sender),
+                ));
+
+                match compio::time::timeout(timeout_duration, 
receiver.recv()).await {
+                    Ok(Ok(response)) => Ok((shard_id, response)),
+                    Ok(Err(_)) => Err((shard_id, 
IggyError::ShardCommunicationError(shard_id))),
+                    Err(_) => Err((shard_id, IggyError::TaskTimeout)),
                 }
-                */
-            })
-        {
-            match maybe_receiver {
-                Some(receiver) => {
-                    let response = receiver.recv().await.unwrap();
-                    responses.push(response);
-                }
-                None => {
-                    responses.push(ShardResponse::Event);
+            };
+
+            futures.push(future);
+        }
+
+        // If no other shards exist, return success with empty responses
+        if futures.is_empty() {
+            return BroadcastResult::Success(Vec::new());
+        }
+
+        // Collect all results in parallel
+        let results = join_all(futures).await;
+
+        // Process results
+        let mut responses = Vec::new();
+        let mut errors = Vec::new();
+
+        for result in results {
+            match result {
+                Ok((_, response)) => responses.push(response),
+                Err((shard_id, error)) => {
+                    // Log the error for observability
+                    shard_warn!(
+                        self.id,
+                        "Failed to broadcast event to shard {}: {:?}",
+                        shard_id,
+                        error
+                    );
+                    errors.push((shard_id, error));
                 }
             }
         }
-        responses
+
+        if errors.is_empty() {
+            BroadcastResult::Success(responses)
+        } else if responses.is_empty() {
+            BroadcastResult::Failure(BroadcastError::TotalFailure(
+                IggyError::ShardCommunicationError(0), // 0 indicates all 
shards failed
+            ))
+        } else {
+            BroadcastResult::PartialSuccess { responses, errors }
+        }
     }
 
     pub fn add_active_session(&self, session: Rc<Session>) {
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index c963103da..a2ed25d14 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,6 +18,7 @@
 
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpSocketConfig;
+use crate::shard::BroadcastResult;
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
 use crate::shard::transmission::event::ShardEvent;
@@ -111,7 +112,22 @@ pub async fn start(
                 protocol: TransportProtocol::Tcp,
                 address: actual_addr,
             };
-            shard.broadcast_event_to_all_shards(event).await;
+            match shard.broadcast_event_to_all_shards(event).await {
+                BroadcastResult::Success(_) => {}
+                BroadcastResult::PartialSuccess { errors, .. } => {
+                    for (shard_id, error) in errors {
+                        shard_info!(
+                            shard.id,
+                            "Shard {} failed to receive address: {:?}",
+                            shard_id,
+                            error
+                        );
+                    }
+                }
+                BroadcastResult::Failure(err) => {
+                    shard_error!(shard.id, "Failed to broadcast address: 
{:?}", err);
+                }
+            }
         }
     }
 
@@ -149,7 +165,17 @@ async fn accept_loop(
                         // Broadcast session to all shards.
                         let event = ShardEvent::NewSession { address, 
transport };
                         // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event).await;
+                        match 
shard_clone.broadcast_event_to_all_shards(event).await {
+                            BroadcastResult::Success(_) => {}
+                            BroadcastResult::PartialSuccess { errors, .. } => {
+                                for (shard_id, error) in errors {
+                                    shard_info!(shard.id, "Shard {} failed 
NewSession: {:?}", shard_id, error);
+                                }
+                            }
+                            BroadcastResult::Failure(err) => {
+                                shard_error!(shard.id, "Failed to broadcast 
NewSession: {:?}", err);
+                            }
+                        }
 
                         let client_id = session.client_id;
                         let user_id = session.get_user_id();
@@ -168,7 +194,17 @@ async fn accept_loop(
                             }
                             registry_clone.remove_connection(&client_id);
                             let event = ShardEvent::ClientDisconnected { 
client_id, user_id };
-                            let _responses = 
shard_for_conn.broadcast_event_to_all_shards(event).await;
+                            match 
shard_for_conn.broadcast_event_to_all_shards(event).await {
+                                BroadcastResult::Success(_) => {}
+                                BroadcastResult::PartialSuccess { errors, .. } 
=> {
+                                    for (shard_id, error) in errors {
+                                        shard_info!(shard_for_conn.id, "Shard 
{} failed ClientDisconnected: {:?}", shard_id, error);
+                                    }
+                                }
+                                BroadcastResult::Failure(err) => {
+                                    shard_error!(shard_for_conn.id, "Failed to 
broadcast ClientDisconnected: {:?}", err);
+                                }
+                            }
 
                             if let Err(error) = sender.shutdown().await {
                                 shard_error!(shard.id, "Failed to shutdown TCP 
stream for client: {}, address: {}. {}", client_id, address, error);

Reply via email to