This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new a7f2760a7 feat(io_uring): implement broadcast mechanism error handling 
(#2273)
a7f2760a7 is described below

commit a7f2760a7efb40c7c8bc5a0f888355e525b3f539
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Oct 16 18:14:32 2025 +0200

    feat(io_uring): implement broadcast mechanism error handling (#2273)
---
 core/common/src/error/iggy_error.rs                |   4 +-
 core/common/src/utils/crypto.rs                    |   5 +-
 core/integration/src/test_mcp_server.rs            |  92 ++++++++++++-----
 .../tests/server/scenarios/concurrent_scenario.rs  |   6 +-
 .../create_consumer_group_handler.rs               |   3 +-
 .../delete_consumer_group_handler.rs               |   3 +-
 .../consumer_groups/join_consumer_group_handler.rs |   3 +-
 .../leave_consumer_group_handler.rs                |   3 +-
 .../partitions/create_partitions_handler.rs        |  12 ++-
 .../partitions/delete_partitions_handler.rs        |  18 +++-
 .../create_personal_access_token_handler.rs        |   3 +-
 .../delete_personal_access_token_handler.rs        |   2 +-
 .../login_with_personal_access_token_handler.rs    |   2 +-
 .../handlers/segments/delete_segments_handler.rs   |   3 +-
 .../handlers/streams/create_stream_handler.rs      |   4 +-
 .../handlers/streams/delete_stream_handler.rs      |   3 +-
 .../handlers/streams/purge_stream_handler.rs       |   3 +-
 .../handlers/streams/update_stream_handler.rs      |   3 +-
 .../binary/handlers/topics/create_topic_handler.rs |   7 +-
 .../binary/handlers/topics/delete_topic_handler.rs |   3 +-
 .../binary/handlers/topics/purge_topic_handler.rs  |   2 +-
 .../binary/handlers/topics/update_topic_handler.rs |   3 +-
 .../handlers/users/change_password_handler.rs      |   3 +-
 .../binary/handlers/users/create_user_handler.rs   |   2 +-
 .../binary/handlers/users/delete_user_handler.rs   |   3 +-
 .../binary/handlers/users/login_user_handler.rs    |  12 ++-
 .../binary/handlers/users/logout_user_handler.rs   |   3 +-
 .../handlers/users/update_permissions_handler.rs   |   3 +-
 .../binary/handlers/users/update_user_handler.rs   |   3 +-
 core/server/src/quic/quic_server.rs                |   2 +-
 core/server/src/shard/mod.rs                       | 113 ++++++++++++---------
 core/server/src/shard/system/streams.rs            |   2 -
 core/server/src/shard/system/users.rs              |   4 +-
 core/server/src/shard/transmission/event.rs        |   4 +-
 core/server/src/slab/users.rs                      |   6 ++
 core/server/src/streaming/topics/helpers.rs        |  42 +++++++-
 core/server/src/tcp/tcp_listener.rs                |  12 ++-
 core/server/src/tcp/tcp_tls_listener.rs            |   2 +-
 38 files changed, 275 insertions(+), 128 deletions(-)

diff --git a/core/common/src/error/iggy_error.rs 
b/core/common/src/error/iggy_error.rs
index 326c059ea..eb6bbb84e 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -485,8 +485,8 @@ pub enum IggyError {
 
     #[error("Shard not found for stream ID: {0}, topic ID: {1}, partition ID: 
{2}")]
     ShardNotFound(usize, usize, usize) = 11000,
-    #[error("Shard communication error, shard ID: {0}")]
-    ShardCommunicationError(u16) = 11001,
+    #[error("Shard communication error")]
+    ShardCommunicationError = 11001,
 
     #[error("Cannot bind to socket with addr: {0}")]
     CannotBindToSocket(String) = 12000,
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 11d51cb7c..fca62dde1 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -18,7 +18,6 @@
 
 use crate::IggyError;
 use crate::text;
-use aes_gcm::aead::generic_array::GenericArray;
 use aes_gcm::aead::{Aead, OsRng};
 use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
 use std::fmt::Debug;
@@ -63,7 +62,7 @@ impl Aes256GcmEncryptor {
             return Err(IggyError::InvalidEncryptionKey);
         }
         Ok(Self {
-            cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
+            cipher: Aes256Gcm::new(key.into()),
         })
     }
 
@@ -84,7 +83,7 @@ impl Encryptor for Aes256GcmEncryptor {
     }
 
     fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
-        let nonce = GenericArray::from_slice(&data[0..12]);
+        let nonce = (&data[0..12]).into();
         let payload = self.cipher.decrypt(nonce, &data[12..]);
         if payload.is_err() {
             return Err(IggyError::CannotDecryptData);
diff --git a/core/integration/src/test_mcp_server.rs 
b/core/integration/src/test_mcp_server.rs
index b24270902..9d729eaa8 100644
--- a/core/integration/src/test_mcp_server.rs
+++ b/core/integration/src/test_mcp_server.rs
@@ -24,17 +24,18 @@ use rmcp::{
     service::RunningService,
     transport::StreamableHttpClientTransport,
 };
-use std::fs::OpenOptions;
-use std::io::Write;
+use std::fs::{self, File};
 use std::net::{Ipv4Addr, SocketAddr};
 use std::path::PathBuf;
-use std::process::{Child, Command};
+use std::process::{Child, Command, Stdio};
+use std::thread::panicking;
 use std::time::Duration;
 use std::{collections::HashMap, net::TcpListener};
 use tokio::time::sleep;
 
 pub const CONSUMER_NAME: &str = "mcp";
 const MCP_PATH: &str = "/mcp";
+const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
 
 pub type McpClient = RunningService<RoleClient, InitializeRequestParam>;
 
@@ -101,6 +102,22 @@ impl TestMcpServer {
             Command::cargo_bin("iggy-mcp").unwrap()
         };
         command.envs(self.envs.clone());
+
+        // By default, MCP server logs are redirected to files,
+        // and dumped to stderr when test fails. With IGGY_TEST_VERBOSE=1
+        // logs are dumped to stdout during test execution.
+        if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok()
+            || self.envs.contains_key(TEST_VERBOSITY_ENV_VAR)
+        {
+            command.stdout(Stdio::inherit());
+            command.stderr(Stdio::inherit());
+        } else {
+            command.stdout(self.get_stdout_file());
+            self.stdout_file_path = 
Some(fs::canonicalize(self.get_stdout_file_path()).unwrap());
+            command.stderr(self.get_stderr_file());
+            self.stderr_file_path = 
Some(fs::canonicalize(self.get_stderr_file_path()).unwrap());
+        }
+
         let child = command.spawn().unwrap();
         self.child_handle = Some(child);
     }
@@ -118,29 +135,7 @@ impl TestMcpServer {
             #[cfg(not(unix))]
             child_handle.kill().unwrap();
 
-            if let Ok(output) = child_handle.wait_with_output() {
-                let stderr = String::from_utf8_lossy(&output.stderr);
-                let stdout = String::from_utf8_lossy(&output.stdout);
-                if let Some(stderr_file_path) = &self.stderr_file_path {
-                    OpenOptions::new()
-                        .append(true)
-                        .create(true)
-                        .open(stderr_file_path)
-                        .unwrap()
-                        .write_all(stderr.as_bytes())
-                        .unwrap();
-                }
-
-                if let Some(stdout_file_path) = &self.stdout_file_path {
-                    OpenOptions::new()
-                        .append(true)
-                        .create(true)
-                        .open(stdout_file_path)
-                        .unwrap()
-                        .write_all(stdout.as_bytes())
-                        .unwrap();
-                }
-            }
+            let _ = child_handle.wait();
         }
     }
 
@@ -224,10 +219,55 @@ impl TestMcpServer {
 
         panic!("Failed to find a free port after {max_retries} retries");
     }
+
+    fn get_stdout_file_path(&self) -> String {
+        format!("/tmp/iggy-mcp-{}.stdout", self.server_address.port())
+    }
+
+    fn get_stderr_file_path(&self) -> String {
+        format!("/tmp/iggy-mcp-{}.stderr", self.server_address.port())
+    }
+
+    fn get_stdout_file(&self) -> Stdio {
+        Stdio::from(File::create(self.get_stdout_file_path()).unwrap())
+    }
+
+    fn get_stderr_file(&self) -> Stdio {
+        Stdio::from(File::create(self.get_stderr_file_path()).unwrap())
+    }
+
+    fn read_file_to_string(path: &str) -> String {
+        fs::read_to_string(path).unwrap_or_else(|_| format!("Failed to read 
file: {}", path))
+    }
 }
 
 impl Drop for TestMcpServer {
     fn drop(&mut self) {
         self.stop();
+
+        if panicking() {
+            if let Some(stdout_file_path) = &self.stdout_file_path {
+                eprintln!(
+                    "Iggy MCP server stdout:\n{}",
+                    
Self::read_file_to_string(stdout_file_path.to_str().unwrap())
+                );
+            }
+
+            if let Some(stderr_file_path) = &self.stderr_file_path {
+                eprintln!(
+                    "Iggy MCP server stderr:\n{}",
+                    
Self::read_file_to_string(stderr_file_path.to_str().unwrap())
+                );
+            }
+        }
+
+        // Clean up log files
+        if let Some(stdout_file_path) = &self.stdout_file_path {
+            fs::remove_file(stdout_file_path).unwrap();
+        }
+
+        if let Some(stderr_file_path) = &self.stderr_file_path {
+            fs::remove_file(stderr_file_path).unwrap();
+        }
     }
 }
diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs 
b/core/integration/tests/server/scenarios/concurrent_scenario.rs
index 88e76b3fd..e7a2831ef 100644
--- a/core/integration/tests/server/scenarios/concurrent_scenario.rs
+++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs
@@ -21,8 +21,8 @@ use futures::future::join_all;
 use iggy::prelude::*;
 use integration::test_server::{ClientFactory, login_root};
 
-const OPERATIONS_COUNT: usize = 20;
-const MULTIPLE_CLIENT_COUNT: usize = 10;
+const OPERATIONS_COUNT: usize = 100;
+const MULTIPLE_CLIENT_COUNT: usize = 20;
 const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT;
 const USER_PASSWORD: &str = "secret";
 const TEST_STREAM_NAME: &str = "race-test-stream";
@@ -33,6 +33,8 @@ pub enum ResourceType {
     User,
     Stream,
     Topic,
+    // TODO(hubcio): add ConsumerGroup
+    // TODO(hubcio): add Partition
 }
 
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
diff --git 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index eb7e80190..c8323ee13 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
             topic_id: self.topic_id.clone(),
             cg,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
diff --git 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 5dfb9dda5..be50f5fec 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -87,7 +88,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
             topic_id: self.topic_id.clone(),
             group_id: self.group_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         let stream_id = self.stream_id.clone();
         let topic_id = self.topic_id.clone();
         shard
diff --git 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index bca8742f4..4cc94d85c 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
@@ -69,7 +70,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
             topic_id,
             group_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 8e7955019..631636cf2 100644
--- 
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++ 
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use super::COMPONENT;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::sender::SenderKind;
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::streaming::session::Session;
@@ -71,7 +72,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
             topic_id,
             group_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 6589031da..981858a08 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -19,8 +19,10 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
+use crate::slab::traits_ext::EntityMarker;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use crate::streaming::{streams, topics};
@@ -53,13 +55,19 @@ impl ServerCommandHandler for CreatePartitions {
                 self.partitions_count,
             )
             .await?;
+        let partition_ids = partitions.iter().map(|p| 
p.id()).collect::<Vec<_>>();
         let event = ShardEvent::CreatedPartitions2 {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
             partitions,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+        shard.broadcast_event_to_all_shards(event).await?;
+
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            topics::helpers::rebalance_consumer_group(shard.id, 
&partition_ids),
+        );
 
         let stream_id = shard
             .streams2
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 98b56f76a..2680ab39e 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -61,8 +62,21 @@ impl ServerCommandHandler for DeletePartitions {
             partitions_count: self.partitions_count,
             partition_ids: deleted_partition_ids,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
-        // TODO: Rebalance the consumer group.
+        shard.broadcast_event_to_all_shards(event).await?;
+
+        let remaining_partition_ids = shard.streams2.with_topic_by_id(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::get_partition_ids(),
+        );
+        shard.streams2.with_topic_by_id_mut(
+            &self.stream_id,
+            &self.topic_id,
+            crate::streaming::topics::helpers::rebalance_consumer_group(
+                shard.id,
+                &remaining_partition_ids,
+            ),
+        );
 
         shard
         .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index ea7b947a4..9b2294fe3 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::personal_access_tokens::COMPONENT, 
sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -60,7 +61,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
         let event = ShardEvent::CreatedPersonalAccessToken {
             personal_access_token: personal_access_token.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index b2708f9c4..e962aadcc 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -56,7 +56,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
             user_id: session.get_user_id(),
             name: self.name.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index e7ecc33f9..f18bded31 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -60,7 +60,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
             token: self.token,
             client_id: session.client_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs 
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ccb5f859d..c8c9b16fc 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -68,7 +69,7 @@ impl ServerCommandHandler for DeleteSegments {
             partition_id: self.partition_id as usize,
             segments_count: self.segments_count,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0b1c1ec0a..cfabf569d 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -60,7 +61,8 @@ impl ServerCommandHandler for CreateStream {
             id: created_stream_id,
             stream,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        shard.broadcast_event_to_all_shards(event).await?;
 
         let response = shard
             .streams2
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index f59b11b5b..43ff59f2a 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -65,7 +66,7 @@ impl ServerCommandHandler for DeleteStream {
             id: stream2.id(),
             stream_id: self.stream_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs 
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index b8b35b645..d25d59a7f 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -54,7 +55,7 @@ impl ServerCommandHandler for PurgeStream {
         let event = ShardEvent::PurgedStream2 {
             stream_id: self.stream_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 996db7f4b..866c3f7ff 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -55,7 +56,7 @@ impl ServerCommandHandler for UpdateStream {
             stream_id: self.stream_id.clone(),
             name: self.name.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         shard
             .state
             .apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 35eba8d93..fadb137cd 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand, 
ServerCommand, ServerCommandHa
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::slab::traits_ext::EntityMarker;
@@ -71,7 +72,8 @@ impl ServerCommandHandler for CreateTopic {
             stream_id: self.stream_id.clone(),
             topic,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        shard.broadcast_event_to_all_shards(event).await?;
         let partitions = shard
             .create_partitions2(
                 session,
@@ -85,7 +87,8 @@ impl ServerCommandHandler for CreateTopic {
             topic_id: Identifier::numeric(topic_id as u32).unwrap(),
             partitions,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+        shard.broadcast_event_to_all_shards(event).await?;
         let response = shard.streams2.with_topic_by_id(
             &self.stream_id,
             &Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 69d9d9957..57b6294f3 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -71,7 +72,7 @@ impl ServerCommandHandler for DeleteTopic {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs 
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 7c4d5179c..220af3a58 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -60,7 +60,7 @@ impl ServerCommandHandler for PurgeTopic {
             stream_id: self.stream_id.clone(),
             topic_id: self.topic_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 8f9146ac9..d06ef5c11 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
@@ -81,7 +82,7 @@ impl ServerCommandHandler for UpdateTopic {
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         let topic_id = self.topic_id.clone();
         let stream_id = self.stream_id.clone();
 
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0b595ee4b..f9fa2729d 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -19,6 +19,7 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -77,7 +78,7 @@ impl ServerCommandHandler for ChangePassword {
             current_password: self.current_password.clone(),
             new_password: self.new_password.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         // For the security of the system, we hash the password before storing 
it in metadata.
         shard
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 96521d5ff..8dc18e0f7 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -78,7 +78,7 @@ impl ServerCommandHandler for CreateUser {
             status: self.status,
             permissions: self.permissions.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         let user_id = user.id;
         let response = mapper::map_user(&user);
 
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs 
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index f265f8df0..c807e6d04 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -66,7 +67,7 @@ impl ServerCommandHandler for DeleteUser {
         let event = ShardEvent::DeletedUser {
             user_id: self.user_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         let user_id = self.user_id.clone();
         shard
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 395f2724b..6ee6471db 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -16,20 +16,19 @@
  * under the License.
  */
 
-use std::rc::Rc;
-
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::mapper;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
-use crate::shard_info;
 use crate::streaming::session::Session;
+use crate::{shard_info, shard_warn};
 use anyhow::Result;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 use iggy_common::login_user::LoginUser;
+use std::rc::Rc;
 use tracing::{debug, instrument};
 
 impl ServerCommandHandler for LoginUser {
@@ -45,6 +44,11 @@ impl ServerCommandHandler for LoginUser {
         session: &Rc<Session>,
         shard: &Rc<IggyShard>,
     ) -> Result<(), IggyError> {
+        if shard.is_shutting_down() {
+            shard_warn!(shard.id, "Rejecting login request during shutdown");
+            return Err(IggyError::Disconnected);
+        }
+
         debug!("session: {session}, command: {self}");
         let LoginUser {
             username, password, ..
@@ -70,7 +74,7 @@ impl ServerCommandHandler for LoginUser {
             password,
         };
         // Broadcast the event to all shards.
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs 
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 7dbc99b4d..3f52d2255 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -61,7 +62,7 @@ impl ServerCommandHandler for LogoutUser {
         let event = ShardEvent::LogoutUser {
             client_id: session.client_id,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
         session.clear_user_id();
         sender.send_empty_ok_response().await?;
         Ok(())
diff --git 
a/core/server/src/binary/handlers/users/update_permissions_handler.rs 
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 4b42d9026..0856516d8 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -61,7 +62,7 @@ impl ServerCommandHandler for UpdatePermissions {
             user_id: self.user_id.clone(),
             permissions: self.permissions.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c8f28db38..cffd953c9 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
 use crate::shard::IggyShard;
 use crate::shard::transmission::event::ShardEvent;
 use crate::shard_info;
@@ -73,7 +74,7 @@ impl ServerCommandHandler for UpdateUser {
             username: self.username.clone(),
             status: self.status,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event).await;
+        shard.broadcast_event_to_all_shards(event).await?;
 
         let user_id = self.user_id.clone();
         shard
diff --git a/core/server/src/quic/quic_server.rs 
b/core/server/src/quic/quic_server.rs
index c5033213a..5fb3cd9b2 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -135,7 +135,7 @@ pub async fn spawn_quic_server(
                 protocol: iggy_common::TransportProtocol::Quic,
                 address: actual_addr,
             };
-            shard.broadcast_event_to_all_shards(event).await;
+            shard.broadcast_event_to_all_shards(event).await?;
         }
     } else {
         shard.quic_bound_address.set(Some(actual_addr));
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f65def97f..76e5b21c2 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -36,7 +36,7 @@ use crate::{
             message::{ShardMessage, ShardRequest, ShardRequestPayload, 
ShardSendRequestResult},
         },
     },
-    shard_error, shard_info,
+    shard_error, shard_info, shard_warn,
     slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
     state::file::FileState,
     streaming::{
@@ -49,6 +49,7 @@ use builder::IggyShardBuilder;
 use compio::io::AsyncWriteAtExt;
 use dashmap::DashMap;
 use error_set::ErrContext;
+use futures::future::join_all;
 use hash32::{Hasher, Murmur3Hasher};
 use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
 use std::hash::Hasher as _;
@@ -64,6 +65,7 @@ use transmission::connector::{Receiver, ShardConnector, 
StopReceiver};
 
 pub const COMPONENT: &str = "SHARD";
 pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(20);
 
 pub(crate) struct Shard {
     id: u16,
@@ -86,7 +88,7 @@ impl Shard {
         //TODO: Fixme
         let response = receiver.recv().await.map_err(|err| {
             error!("Failed to receive response from shard: {err}");
-            IggyError::ShardCommunicationError(self.id)
+            IggyError::ShardCommunicationError
         })?;
         Ok(response)
     }
@@ -672,7 +674,9 @@ impl IggyShard {
                 Ok(())
             }
             ShardEvent::CreatedTopic2 { stream_id, topic } => {
-                let _topic_id = self.create_topic2_bypass_auth(&stream_id, 
topic);
+                let topic_id_from_event = topic.id();
+                let topic_id = self.create_topic2_bypass_auth(&stream_id, 
topic.clone());
+                assert_eq!(topic_id, topic_id_from_event);
                 Ok(())
             }
             ShardEvent::CreatedPartitions2 {
@@ -898,59 +902,68 @@ impl IggyShard {
         }
     }
 
-    pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) -> 
Vec<ShardResponse> {
-        let mut responses = 
Vec::with_capacity(self.get_available_shards_count() as usize);
-        for maybe_receiver in self
+    pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) -> 
Result<(), IggyError> {
+        if self.is_shutting_down() {
+            shard_info!(
+                self.id,
+                "Skipping broadcast during shutdown for event: {}",
+                event
+            );
+            return Ok(());
+        }
+
+        let event_type = event.to_string();
+        let futures = self
             .shards
             .iter()
-            .filter_map(|shard| {
-                if shard.id != self.id {
-                    Some(shard.connection.clone())
-                } else {
-                    None
-                }
-            })
-            .map(|conn| {
-                // TODO: Fixme, maybe we should send response_sender
-                // and propagate errors back.
+            .filter(|s| s.id != self.id)
+            .map(|shard| {
                 let event = event.clone();
-                /*
-                if matches!(
-                    &event,
-                    ShardEvent::CreatedStream2 { .. }
-                        | ShardEvent::DeletedStream2 { .. }
-                        | ShardEvent::CreatedTopic2 { .. }
-                        | ShardEvent::DeletedTopic2 { .. }
-                        | ShardEvent::UpdatedTopic2 { .. }
-                        | ShardEvent::CreatedPartitions2 { .. }
-                        | ShardEvent::DeletedPartitions2 { .. }
-                        | ShardEvent::CreatedConsumerGroup2 { .. }
-                        | ShardEvent::CreatedPersonalAccessToken { .. }
-                        | ShardEvent::DeletedConsumerGroup2 { .. }
-                ) {
-                */
-                let (sender, receiver) = async_channel::bounded(1);
-                conn.send(ShardFrame::new(event.into(), Some(sender.clone())));
-                Some(receiver.clone())
-                /*
-                } else {
-                    conn.send(ShardFrame::new(event.into(), None));
-                    None
+                let conn = shard.connection.clone();
+                let shard_id = shard.id;
+                let self_id = self.id;
+                let event_type = event_type.clone();
+
+                async move {
+                    let (sender, receiver) = async_channel::bounded(1);
+                    conn.send(ShardFrame::new(ShardMessage::Event(event), 
Some(sender)));
+
+                    match compio::time::timeout(BROADCAST_TIMEOUT, 
receiver.recv()).await {
+                        Ok(Ok(_)) => Ok(()),
+                        Ok(Err(e)) => {
+                            shard_warn!(
+                                self_id,
+                                "Broadcast to shard {} failed for event {}: 
channel error: {}",
+                                shard_id, event_type, e
+                            );
+                            Err(())
+                        }
+                        Err(_) => {
+                            shard_warn!(
+                                self_id,
+                                "Broadcast to shard {} failed for event {}: 
timeout waiting for response after {:?}",
+                                shard_id, event_type,
+                                BROADCAST_TIMEOUT
+                            );
+                            Err(())
+                        }
+                    }
                 }
-                */
             })
-        {
-            match maybe_receiver {
-                Some(receiver) => {
-                    let response = receiver.recv().await.unwrap();
-                    responses.push(response);
-                }
-                None => {
-                    responses.push(ShardResponse::Event);
-                }
-            }
+            .collect::<Vec<_>>();
+
+        if futures.is_empty() {
+            return Ok(());
+        }
+
+        let results = join_all(futures).await;
+        let has_failures = results.iter().any(|r| r.is_err());
+
+        if has_failures {
+            Err(IggyError::ShardCommunicationError)
+        } else {
+            Ok(())
         }
-        responses
     }
 
     pub fn add_active_session(&self, session: Rc<Session>) {
diff --git a/core/server/src/shard/system/streams.rs 
b/core/server/src/shard/system/streams.rs
index 4ce440599..a67318f6c 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -19,12 +19,10 @@
 use super::COMPONENT;
 use crate::shard::IggyShard;
 use crate::slab::traits_ext::{DeleteCell, EntityMarker, InsertCell};
-
 use crate::streaming::session::Session;
 use crate::streaming::streams::storage2::{create_stream_file_hierarchy, 
delete_stream_from_disk};
 use crate::streaming::streams::{self, stream2};
 use error_set::ErrContext;
-
 use iggy_common::{Identifier, IggyError};
 
 impl IggyShard {
diff --git a/core/server/src/shard/system/users.rs 
b/core/server/src/shard/system/users.rs
index d4a952f95..ab1c1e40f 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -121,11 +121,13 @@ impl IggyShard {
         permissions: Option<Permissions>,
     ) -> Result<(), IggyError> {
         let assigned_user_id = self.create_user_base(username, password, 
status, permissions)?;
+
         assert_eq!(
             assigned_user_id as u32, expected_user_id,
             "User ID mismatch: expected {}, got {}. This indicates shards are 
out of sync.",
             expected_user_id, assigned_user_id
         );
+
         Ok(())
     }
 
@@ -235,7 +237,6 @@ impl IggyShard {
         let numeric_user_id = Identifier::numeric(user.id).unwrap();
 
         if let Some(ref new_username) = username {
-            let user = self.get_user(user_id)?;
             let existing_user = 
self.get_user(&new_username.to_owned().try_into()?);
             if existing_user.is_ok() && existing_user.unwrap().id != user.id {
                 error!("User: {new_username} already exists.");
@@ -252,6 +253,7 @@ impl IggyShard {
                 format!("{COMPONENT} update user (error: {error}) - failed to 
update user with id: {user_id}")
             })?;
         }
+
         self.get_user(&numeric_user_id)
             .with_error_context(|error| {
                 format!("{COMPONENT} update user (error: {error}) - failed to 
get updated user with id: {user_id}")
diff --git a/core/server/src/shard/transmission/event.rs 
b/core/server/src/shard/transmission/event.rs
index 45f335878..d897cfd00 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -12,9 +12,11 @@ use iggy_common::{
     UserStatus,
 };
 use std::net::SocketAddr;
+use strum::Display;
 
 #[allow(clippy::large_enum_variant)]
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Display)]
+#[strum(serialize_all = "PascalCase")]
 pub enum ShardEvent {
     FlushUnsavedBuffer {
         stream_id: Identifier,
diff --git a/core/server/src/slab/users.rs b/core/server/src/slab/users.rs
index c4859a86e..9ff71eb08 100644
--- a/core/server/src/slab/users.rs
+++ b/core/server/src/slab/users.rs
@@ -153,6 +153,7 @@ impl Users {
                     .ok_or_else(|| 
IggyError::ResourceNotFound(username.to_string()))?
             }
         };
+
         let old_username = {
             let users = self.users.borrow();
             let user = users
@@ -160,15 +161,18 @@ impl Users {
                 .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
             user.username.clone()
         };
+
         if old_username == new_username {
             return Ok(());
         }
+
         tracing::trace!(
             "Updating username: '{}' → '{}' for user ID: {}",
             old_username,
             new_username,
             id
         );
+
         {
             let mut users = self.users.borrow_mut();
             let user = users
@@ -176,9 +180,11 @@ impl Users {
                 .ok_or_else(|| 
IggyError::ResourceNotFound(identifier.to_string()))?;
             user.username = new_username.clone();
         }
+
         let mut index = self.index.borrow_mut();
         index.remove(&old_username);
         index.insert(new_username, id);
+
         Ok(())
     }
 }
diff --git a/core/server/src/streaming/topics/helpers.rs 
b/core/server/src/streaming/topics/helpers.rs
index 87c29c364..3ad1f230b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -6,7 +6,10 @@ use crate::{
         Keyed,
         consumer_groups::{self, ConsumerGroups},
         topics::{self, Topics},
-        traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
+        traits_ext::{
+            ComponentsById, Delete, DeleteCell, EntityComponentSystem, 
EntityComponentSystemMut,
+            EntityMarker, IntoComponents,
+        },
     },
     streaming::{
         stats::TopicStats,
@@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl 
FnOnce(ComponentsById<TopicRef>) -> topics::Contai
     |(root, _, _)| root.id()
 }
 
+pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) -> 
Vec<usize> {
+    |(root, ..)| {
+        root.partitions().with_components(|components| {
+            let (roots, ..) = components.into_components();
+            roots.iter().map(|(id, _)| id).collect()
+        })
+    }
+}
+
 pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) -> 
IggyExpiry {
     |(root, _, _)| root.message_expiry()
 }
@@ -144,6 +156,28 @@ pub fn leave_consumer_group(
     }
 }
 
+pub fn rebalance_consumer_group(
+    shard_id: u16,
+    partition_ids: &[usize],
+) -> impl FnOnce(ComponentsById<TopicRefMut>) {
+    move |(mut root, ..)| {
+        root.consumer_groups_mut()
+            .with_components_mut(|components| {
+                let (all_roots, all_members) = components.into_components();
+                for ((_, consumer_group_root), (_, members)) in
+                    all_roots.iter().zip(all_members.iter_mut())
+                {
+                    let id = consumer_group_root.id();
+                    members.inner_mut().rcu(|existing_members| {
+                        let mut new_members = mimic_members(existing_members);
+                        assign_partitions_to_members(shard_id, id, &mut 
new_members, partition_ids);
+                        new_members
+                    });
+                }
+            });
+    }
+}
+
 pub fn get_consumer_group_member_id(
     client_id: u32,
 ) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize {
@@ -206,7 +240,7 @@ fn add_member(
     members.inner_mut().rcu(move |members| {
         let mut members = mimic_members(members);
         Member::new(client_id).insert_into(&mut members);
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -231,7 +265,7 @@ fn delete_member(
             entry.id = idx;
             true
         });
-        assign_partitions_to_members(shard_id, id, &mut members, 
partitions.to_vec());
+        assign_partitions_to_members(shard_id, id, &mut members, partitions);
         members
     });
 }
@@ -240,7 +274,7 @@ fn assign_partitions_to_members(
     shard_id: u16,
     id: usize,
     members: &mut Slab<Member>,
-    partitions: Vec<usize>,
+    partitions: &[usize],
 ) {
     members
         .iter_mut()
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index c963103da..d7165dac8 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,6 +18,7 @@
 
 use crate::binary::sender::SenderKind;
 use crate::configs::tcp::TcpSocketConfig;
+
 use crate::shard::IggyShard;
 use crate::shard::task_registry::ShutdownToken;
 use crate::shard::transmission::event::ShardEvent;
@@ -111,7 +112,7 @@ pub async fn start(
                 protocol: TransportProtocol::Tcp,
                 address: actual_addr,
             };
-            shard.broadcast_event_to_all_shards(event).await;
+            shard.broadcast_event_to_all_shards(event).await?;
         }
     }
 
@@ -148,8 +149,9 @@ async fn accept_loop(
                         shard_clone.add_active_session(session.clone());
                         // Broadcast session to all shards.
                         let event = ShardEvent::NewSession { address, 
transport };
-                        // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event).await;
+                        if let Err(err) = 
shard_clone.broadcast_event_to_all_shards(event).await {
+                            shard_error!(shard.id, "Failed to broadcast 
NewSession: {:?}", err);
+                        }
 
                         let client_id = session.client_id;
                         let user_id = session.get_user_id();
@@ -168,7 +170,9 @@ async fn accept_loop(
                             }
                             registry_clone.remove_connection(&client_id);
                             let event = ShardEvent::ClientDisconnected { 
client_id, user_id };
-                            let _responses = 
shard_for_conn.broadcast_event_to_all_shards(event).await;
+                            if let Err(err) = 
shard_for_conn.broadcast_event_to_all_shards(event).await {
+                                shard_error!(shard_for_conn.id, "Failed to 
broadcast ClientDisconnected: {:?}", err);
+                            }
 
                             if let Err(error) = sender.shutdown().await {
                                 shard_error!(shard.id, "Failed to shutdown TCP 
stream for client: {}, address: {}. {}", client_id, address, error);
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 56504df36..2f739c166 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -79,7 +79,7 @@ pub(crate) async fn start(
                 protocol: TransportProtocol::Tcp,
                 address: actual_addr,
             };
-            shard.broadcast_event_to_all_shards(event).await;
+            shard.broadcast_event_to_all_shards(event).await?;
         }
     }
 

Reply via email to