This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch io_uring_tpc_broadcast
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc_broadcast by this
push:
new 1983a9f78 add response for ShardEvent
1983a9f78 is described below
commit 1983a9f783726646813c307609a9de3492cc97d8
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Oct 16 13:20:00 2025 +0200
add response for ShardEvent
---
.../create_consumer_group_handler.rs | 15 ++-
.../delete_consumer_group_handler.rs | 15 ++-
.../consumer_groups/join_consumer_group_handler.rs | 15 ++-
.../leave_consumer_group_handler.rs | 15 ++-
.../partitions/create_partitions_handler.rs | 15 ++-
.../partitions/delete_partitions_handler.rs | 15 ++-
.../create_personal_access_token_handler.rs | 15 ++-
.../delete_personal_access_token_handler.rs | 16 ++-
.../login_with_personal_access_token_handler.rs | 15 ++-
.../handlers/segments/delete_segments_handler.rs | 15 ++-
.../handlers/streams/create_stream_handler.rs | 23 +++-
.../handlers/streams/delete_stream_handler.rs | 15 ++-
.../handlers/streams/purge_stream_handler.rs | 15 ++-
.../handlers/streams/update_stream_handler.rs | 15 ++-
.../binary/handlers/topics/create_topic_handler.rs | 43 ++++++-
.../binary/handlers/topics/delete_topic_handler.rs | 15 ++-
.../binary/handlers/topics/purge_topic_handler.rs | 16 ++-
.../binary/handlers/topics/update_topic_handler.rs | 15 ++-
.../handlers/users/change_password_handler.rs | 15 ++-
.../binary/handlers/users/create_user_handler.rs | 15 ++-
.../binary/handlers/users/delete_user_handler.rs | 15 ++-
.../binary/handlers/users/login_user_handler.rs | 15 ++-
.../binary/handlers/users/logout_user_handler.rs | 15 ++-
.../handlers/users/update_permissions_handler.rs | 15 ++-
.../binary/handlers/users/update_user_handler.rs | 15 ++-
core/server/src/shard/mod.rs | 140 +++++++++++++--------
core/server/src/tcp/tcp_listener.rs | 42 ++++++-
27 files changed, 515 insertions(+), 80 deletions(-)
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index eb7e80190..b4577b333 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreateConsumerGroup {
topic_id: self.topic_id.clone(),
cg,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 5dfb9dda5..88fad3802 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -87,7 +88,19 @@ impl ServerCommandHandler for DeleteConsumerGroup {
topic_id: self.topic_id.clone(),
group_id: self.group_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
shard
diff --git
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index bca8742f4..c972d5dec 100644
---
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
@@ -69,7 +70,19 @@ impl ServerCommandHandler for JoinConsumerGroup {
topic_id,
group_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 8e7955019..53a2a0f1a 100644
---
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use super::COMPONENT;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
@@ -71,7 +72,19 @@ impl ServerCommandHandler for LeaveConsumerGroup {
topic_id,
group_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index fe05c200d..ea1b8beec 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreatePartitions {
topic_id: self.topic_id.clone(),
partitions,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard.streams2.with_topic_by_id_mut(
&self.stream_id,
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 9a0451651..9d354371b 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for DeletePartitions {
partitions_count: self.partitions_count,
partition_ids: deleted_partition_ids,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let remaining_partition_ids = shard.streams2.with_topic_by_id(
&self.stream_id,
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index ea7b947a4..7fd38b7e7 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::personal_access_tokens::COMPONENT,
sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -60,7 +61,19 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
let event = ShardEvent::CreatedPersonalAccessToken {
personal_access_token: personal_access_token.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index b2708f9c4..9aa424c7c 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -19,7 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::personal_access_tokens::COMPONENT,
sender::SenderKind};
-use crate::shard::IggyShard;
+use crate::shard::{BroadcastResult, IggyShard};
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -56,7 +56,19 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
user_id: session.get_user_id(),
name: self.name.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index e7ecc33f9..f21111bb3 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -1,3 +1,4 @@
+use crate::shard::BroadcastResult;
use crate::shard::transmission::event::ShardEvent;
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -60,7 +61,19 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
token: self.token,
client_id: session.client_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ccb5f859d..d84d56aac 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -68,7 +69,19 @@ impl ServerCommandHandler for DeleteSegments {
partition_id: self.partition_id as usize,
segments_count: self.segments_count,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0b1c1ec0a..54d46af50 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -60,7 +61,27 @@ impl ServerCommandHandler for CreateStream {
id: created_stream_id,
stream,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ // Broadcast the event and handle potential failures
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {
+ // All shards successfully received the event
+ }
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ // Some shards failed, but we can continue as at least some
succeeded
+ for (shard_id, error) in errors {
+ tracing::warn!(
+ "Shard {} failed to process CreatedStream2 event:
{:?}",
+ shard_id,
+ error
+ );
+ }
+ }
+ BroadcastResult::Failure(_err) => {
+ // All shards failed - this is critical for stream creation
+ return Err(IggyError::ShardCommunicationError(0)); // 0
indicates all shards failed
+ }
+ }
let response = shard
.streams2
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index f59b11b5b..de840fe8a 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -65,7 +66,19 @@ impl ServerCommandHandler for DeleteStream {
id: stream2.id(),
stream_id: self.stream_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index b8b35b645..f82e8148f 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -54,7 +55,19 @@ impl ServerCommandHandler for PurgeStream {
let event = ShardEvent::PurgedStream2 {
stream_id: self.stream_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
.apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 996db7f4b..b1bf298e1 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -55,7 +56,19 @@ impl ServerCommandHandler for UpdateStream {
stream_id: self.stream_id.clone(),
name: self.name.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
.apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 35eba8d93..8456f190e 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -71,7 +72,26 @@ impl ServerCommandHandler for CreateTopic {
stream_id: self.stream_id.clone(),
topic,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {
+ // All shards successfully received the event
+ }
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ // Some shards failed, but we can continue
+ for (shard_id, error) in errors {
+ tracing::warn!(
+ "Shard {} failed to process CreatedTopic2 event: {:?}",
+ shard_id,
+ error
+ );
+ }
+ }
+ BroadcastResult::Failure(_) => {
+ // All shards failed - critical for topic creation
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let partitions = shard
.create_partitions2(
session,
@@ -85,7 +105,26 @@ impl ServerCommandHandler for CreateTopic {
topic_id: Identifier::numeric(topic_id as u32).unwrap(),
partitions,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {
+ // All shards successfully received the event
+ }
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ // Some shards failed, but we can continue
+ for (shard_id, error) in errors {
+ tracing::warn!(
+ "Shard {} failed to process CreatedPartitions2 event:
{:?}",
+ shard_id,
+ error
+ );
+ }
+ }
+ BroadcastResult::Failure(_) => {
+ // All shards failed - critical for partitions creation
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let response = shard.streams2.with_topic_by_id(
&self.stream_id,
&Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 69d9d9957..5c294db77 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -71,7 +72,19 @@ impl ServerCommandHandler for DeleteTopic {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 7c4d5179c..ae5de3cf3 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -19,7 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
-use crate::shard::IggyShard;
+use crate::shard::{BroadcastResult, IggyShard};
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -60,7 +60,19 @@ impl ServerCommandHandler for PurgeTopic {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 8f9146ac9..af1222f8f 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -81,7 +82,19 @@ impl ServerCommandHandler for UpdateTopic {
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let topic_id = self.topic_id.clone();
let stream_id = self.stream_id.clone();
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0b595ee4b..911d1ce2c 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -77,7 +78,19 @@ impl ServerCommandHandler for ChangePassword {
current_password: self.current_password.clone(),
new_password: self.new_password.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
// For the security of the system, we hash the password before storing
it in metadata.
shard
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 96521d5ff..87b1aae6c 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::{shard_debug, shard_info};
@@ -78,7 +79,19 @@ impl ServerCommandHandler for CreateUser {
status: self.status,
permissions: self.permissions.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let user_id = user.id;
let response = mapper::map_user(&user);
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index f265f8df0..d22cdf920 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -66,7 +67,19 @@ impl ServerCommandHandler for DeleteUser {
let event = ShardEvent::DeletedUser {
user_id: self.user_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let user_id = self.user_id.clone();
shard
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 395f2724b..bb4c53509 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -70,7 +71,19 @@ impl ServerCommandHandler for LoginUser {
password,
};
// Broadcast the event to all shards.
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 7dbc99b4d..46ba8952c 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for LogoutUser {
let event = ShardEvent::LogoutUser {
client_id: session.client_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
session.clear_user_id();
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/users/update_permissions_handler.rs
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 4b42d9026..5346aa605 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -61,7 +62,19 @@ impl ServerCommandHandler for UpdatePermissions {
user_id: self.user_id.clone(),
permissions: self.permissions.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
shard
.state
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c8f28db38..2bfd87114 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -73,7 +74,19 @@ impl ServerCommandHandler for UpdateUser {
username: self.username.clone(),
status: self.status,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ tracing::warn!("Shard {} failed to process event: {:?}",
shard_id, error);
+ }
+ }
+
+ BroadcastResult::Failure(_) => {
+ return Err(IggyError::ShardCommunicationError(0));
+ }
+ }
let user_id = self.user_id.clone();
shard
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 7a2529fab..24de07a98 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -36,7 +36,7 @@ use crate::{
message::{ShardMessage, ShardRequest, ShardRequestPayload,
ShardSendRequestResult},
},
},
- shard_error, shard_info,
+ shard_error, shard_info, shard_warn,
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
state::file::FileState,
streaming::{
@@ -49,9 +49,11 @@ use builder::IggyShardBuilder;
use compio::io::AsyncWriteAtExt;
use dashmap::DashMap;
use error_set::ErrContext;
+use futures::future::join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
use std::hash::Hasher as _;
+use std::sync::Arc;
use std::{
cell::{Cell, RefCell},
net::SocketAddr,
@@ -64,6 +66,30 @@ use transmission::connector::{Receiver, ShardConnector,
StopReceiver};
pub const COMPONENT: &str = "SHARD";
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(5);
+
+/// Result type for broadcast operations
+#[derive(Debug)]
+pub enum BroadcastError {
+ PartialFailure {
+ succeeded: Vec<u16>,
+ failed: Vec<(u16, IggyError)>,
+ },
+ TotalFailure(IggyError),
+ Timeout {
+ responded: Vec<u16>,
+ timed_out: Vec<u16>,
+ },
+}
+
+pub enum BroadcastResult {
+ Success(Vec<ShardResponse>),
+ PartialSuccess {
+ responses: Vec<ShardResponse>,
+ errors: Vec<(u16, IggyError)>,
+ },
+ Failure(BroadcastError),
+}
pub(crate) struct Shard {
id: u16,
@@ -899,59 +925,75 @@ impl IggyShard {
}
}
- pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) ->
Vec<ShardResponse> {
- let mut responses =
Vec::with_capacity(self.get_available_shards_count() as usize);
- for maybe_receiver in self
- .shards
- .iter()
- .filter_map(|shard| {
- if shard.id != self.id {
- Some(shard.connection.clone())
- } else {
- None
- }
- })
- .map(|conn| {
- // TODO: Fixme, maybe we should send response_sender
- // and propagate errors back.
- let event = event.clone();
- /*
- if matches!(
- &event,
- ShardEvent::CreatedStream2 { .. }
- | ShardEvent::DeletedStream2 { .. }
- | ShardEvent::CreatedTopic2 { .. }
- | ShardEvent::DeletedTopic2 { .. }
- | ShardEvent::UpdatedTopic2 { .. }
- | ShardEvent::CreatedPartitions2 { .. }
- | ShardEvent::DeletedPartitions2 { .. }
- | ShardEvent::CreatedConsumerGroup2 { .. }
- | ShardEvent::CreatedPersonalAccessToken { .. }
- | ShardEvent::DeletedConsumerGroup2 { .. }
- ) {
- */
+ pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) ->
BroadcastResult {
+ // Use Arc to avoid cloning the event for each shard
+ let event = Arc::new(event);
+ let timeout_duration = BROADCAST_TIMEOUT;
+
+ // Create futures for all shards in parallel
+ let mut futures = Vec::new();
+ let mut shard_ids = Vec::new();
+
+ for shard in self.shards.iter().filter(|s| s.id != self.id) {
+ let event_ref = Arc::clone(&event);
+ let conn = shard.connection.clone();
+ let shard_id = shard.id;
+ shard_ids.push(shard_id);
+
+ let future = async move {
let (sender, receiver) = async_channel::bounded(1);
- conn.send(ShardFrame::new(event.into(), Some(sender.clone())));
- Some(receiver.clone())
- /*
- } else {
- conn.send(ShardFrame::new(event.into(), None));
- None
+ conn.send(ShardFrame::new(
+ ShardMessage::Event((*event_ref).clone()),
+ Some(sender),
+ ));
+
+ match compio::time::timeout(timeout_duration,
receiver.recv()).await {
+ Ok(Ok(response)) => Ok((shard_id, response)),
+ Ok(Err(_)) => Err((shard_id,
IggyError::ShardCommunicationError(shard_id))),
+ Err(_) => Err((shard_id, IggyError::TaskTimeout)),
}
- */
- })
- {
- match maybe_receiver {
- Some(receiver) => {
- let response = receiver.recv().await.unwrap();
- responses.push(response);
- }
- None => {
- responses.push(ShardResponse::Event);
+ };
+
+ futures.push(future);
+ }
+
+ // If no other shards exist, return success with empty responses
+ if futures.is_empty() {
+ return BroadcastResult::Success(Vec::new());
+ }
+
+ // Collect all results in parallel
+ let results = join_all(futures).await;
+
+ // Process results
+ let mut responses = Vec::new();
+ let mut errors = Vec::new();
+
+ for result in results {
+ match result {
+ Ok((_, response)) => responses.push(response),
+ Err((shard_id, error)) => {
+ // Log the error for observability
+ shard_warn!(
+ self.id,
+ "Failed to broadcast event to shard {}: {:?}",
+ shard_id,
+ error
+ );
+ errors.push((shard_id, error));
}
}
}
- responses
+
+ if errors.is_empty() {
+ BroadcastResult::Success(responses)
+ } else if responses.is_empty() {
+ BroadcastResult::Failure(BroadcastError::TotalFailure(
+ IggyError::ShardCommunicationError(0), // 0 indicates all
shards failed
+ ))
+ } else {
+ BroadcastResult::PartialSuccess { responses, errors }
+ }
}
pub fn add_active_session(&self, session: Rc<Session>) {
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index c963103da..a2ed25d14 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,6 +18,7 @@
use crate::binary::sender::SenderKind;
use crate::configs::tcp::TcpSocketConfig;
+use crate::shard::BroadcastResult;
use crate::shard::IggyShard;
use crate::shard::task_registry::ShutdownToken;
use crate::shard::transmission::event::ShardEvent;
@@ -111,7 +112,22 @@ pub async fn start(
protocol: TransportProtocol::Tcp,
address: actual_addr,
};
- shard.broadcast_event_to_all_shards(event).await;
+ match shard.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ shard_info!(
+ shard.id,
+ "Shard {} failed to receive address: {:?}",
+ shard_id,
+ error
+ );
+ }
+ }
+ BroadcastResult::Failure(err) => {
+ shard_error!(shard.id, "Failed to broadcast address:
{:?}", err);
+ }
+ }
}
}
@@ -149,7 +165,17 @@ async fn accept_loop(
// Broadcast session to all shards.
let event = ShardEvent::NewSession { address,
transport };
// TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard_clone.broadcast_event_to_all_shards(event).await;
+ match
shard_clone.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+ BroadcastResult::PartialSuccess { errors, .. } => {
+ for (shard_id, error) in errors {
+ shard_info!(shard.id, "Shard {} failed
NewSession: {:?}", shard_id, error);
+ }
+ }
+ BroadcastResult::Failure(err) => {
+ shard_error!(shard.id, "Failed to broadcast
NewSession: {:?}", err);
+ }
+ }
let client_id = session.client_id;
let user_id = session.get_user_id();
@@ -168,7 +194,17 @@ async fn accept_loop(
}
registry_clone.remove_connection(&client_id);
let event = ShardEvent::ClientDisconnected {
client_id, user_id };
- let _responses =
shard_for_conn.broadcast_event_to_all_shards(event).await;
+ match
shard_for_conn.broadcast_event_to_all_shards(event).await {
+ BroadcastResult::Success(_) => {}
+ BroadcastResult::PartialSuccess { errors, .. }
=> {
+ for (shard_id, error) in errors {
+ shard_info!(shard_for_conn.id, "Shard
{} failed ClientDisconnected: {:?}", shard_id, error);
+ }
+ }
+ BroadcastResult::Failure(err) => {
+ shard_error!(shard_for_conn.id, "Failed to
broadcast ClientDisconnected: {:?}", err);
+ }
+ }
if let Err(error) = sender.shutdown().await {
shard_error!(shard.id, "Failed to shutdown TCP
stream for client: {}, address: {}. {}", client_id, address, error);