This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new a7f2760a7 feat(io_uring): implement broadcast mechanism error handling
(#2273)
a7f2760a7 is described below
commit a7f2760a7efb40c7c8bc5a0f888355e525b3f539
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Thu Oct 16 18:14:32 2025 +0200
feat(io_uring): implement broadcast mechanism error handling (#2273)
---
core/common/src/error/iggy_error.rs | 4 +-
core/common/src/utils/crypto.rs | 5 +-
core/integration/src/test_mcp_server.rs | 92 ++++++++++++-----
.../tests/server/scenarios/concurrent_scenario.rs | 6 +-
.../create_consumer_group_handler.rs | 3 +-
.../delete_consumer_group_handler.rs | 3 +-
.../consumer_groups/join_consumer_group_handler.rs | 3 +-
.../leave_consumer_group_handler.rs | 3 +-
.../partitions/create_partitions_handler.rs | 12 ++-
.../partitions/delete_partitions_handler.rs | 18 +++-
.../create_personal_access_token_handler.rs | 3 +-
.../delete_personal_access_token_handler.rs | 2 +-
.../login_with_personal_access_token_handler.rs | 2 +-
.../handlers/segments/delete_segments_handler.rs | 3 +-
.../handlers/streams/create_stream_handler.rs | 4 +-
.../handlers/streams/delete_stream_handler.rs | 3 +-
.../handlers/streams/purge_stream_handler.rs | 3 +-
.../handlers/streams/update_stream_handler.rs | 3 +-
.../binary/handlers/topics/create_topic_handler.rs | 7 +-
.../binary/handlers/topics/delete_topic_handler.rs | 3 +-
.../binary/handlers/topics/purge_topic_handler.rs | 2 +-
.../binary/handlers/topics/update_topic_handler.rs | 3 +-
.../handlers/users/change_password_handler.rs | 3 +-
.../binary/handlers/users/create_user_handler.rs | 2 +-
.../binary/handlers/users/delete_user_handler.rs | 3 +-
.../binary/handlers/users/login_user_handler.rs | 12 ++-
.../binary/handlers/users/logout_user_handler.rs | 3 +-
.../handlers/users/update_permissions_handler.rs | 3 +-
.../binary/handlers/users/update_user_handler.rs | 3 +-
core/server/src/quic/quic_server.rs | 2 +-
core/server/src/shard/mod.rs | 113 ++++++++++++---------
core/server/src/shard/system/streams.rs | 2 -
core/server/src/shard/system/users.rs | 4 +-
core/server/src/shard/transmission/event.rs | 4 +-
core/server/src/slab/users.rs | 6 ++
core/server/src/streaming/topics/helpers.rs | 42 +++++++-
core/server/src/tcp/tcp_listener.rs | 12 ++-
core/server/src/tcp/tcp_tls_listener.rs | 2 +-
38 files changed, 275 insertions(+), 128 deletions(-)
diff --git a/core/common/src/error/iggy_error.rs
b/core/common/src/error/iggy_error.rs
index 326c059ea..eb6bbb84e 100644
--- a/core/common/src/error/iggy_error.rs
+++ b/core/common/src/error/iggy_error.rs
@@ -485,8 +485,8 @@ pub enum IggyError {
#[error("Shard not found for stream ID: {0}, topic ID: {1}, partition ID:
{2}")]
ShardNotFound(usize, usize, usize) = 11000,
- #[error("Shard communication error, shard ID: {0}")]
- ShardCommunicationError(u16) = 11001,
+ #[error("Shard communication error")]
+ ShardCommunicationError = 11001,
#[error("Cannot bind to socket with addr: {0}")]
CannotBindToSocket(String) = 12000,
diff --git a/core/common/src/utils/crypto.rs b/core/common/src/utils/crypto.rs
index 11d51cb7c..fca62dde1 100644
--- a/core/common/src/utils/crypto.rs
+++ b/core/common/src/utils/crypto.rs
@@ -18,7 +18,6 @@
use crate::IggyError;
use crate::text;
-use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::{Aead, OsRng};
use aes_gcm::{AeadCore, Aes256Gcm, KeyInit};
use std::fmt::Debug;
@@ -63,7 +62,7 @@ impl Aes256GcmEncryptor {
return Err(IggyError::InvalidEncryptionKey);
}
Ok(Self {
- cipher: Aes256Gcm::new(GenericArray::from_slice(key)),
+ cipher: Aes256Gcm::new(key.into()),
})
}
@@ -84,7 +83,7 @@ impl Encryptor for Aes256GcmEncryptor {
}
fn decrypt(&self, data: &[u8]) -> Result<Vec<u8>, IggyError> {
- let nonce = GenericArray::from_slice(&data[0..12]);
+ let nonce = (&data[0..12]).into();
let payload = self.cipher.decrypt(nonce, &data[12..]);
if payload.is_err() {
return Err(IggyError::CannotDecryptData);
diff --git a/core/integration/src/test_mcp_server.rs
b/core/integration/src/test_mcp_server.rs
index b24270902..9d729eaa8 100644
--- a/core/integration/src/test_mcp_server.rs
+++ b/core/integration/src/test_mcp_server.rs
@@ -24,17 +24,18 @@ use rmcp::{
service::RunningService,
transport::StreamableHttpClientTransport,
};
-use std::fs::OpenOptions;
-use std::io::Write;
+use std::fs::{self, File};
use std::net::{Ipv4Addr, SocketAddr};
use std::path::PathBuf;
-use std::process::{Child, Command};
+use std::process::{Child, Command, Stdio};
+use std::thread::panicking;
use std::time::Duration;
use std::{collections::HashMap, net::TcpListener};
use tokio::time::sleep;
pub const CONSUMER_NAME: &str = "mcp";
const MCP_PATH: &str = "/mcp";
+const TEST_VERBOSITY_ENV_VAR: &str = "IGGY_TEST_VERBOSE";
pub type McpClient = RunningService<RoleClient, InitializeRequestParam>;
@@ -101,6 +102,22 @@ impl TestMcpServer {
Command::cargo_bin("iggy-mcp").unwrap()
};
command.envs(self.envs.clone());
+
+ // By default, MCP server logs are redirected to files,
+ // and dumped to stderr when test fails. With IGGY_TEST_VERBOSE=1
+ // logs are dumped to stdout during test execution.
+ if std::env::var(TEST_VERBOSITY_ENV_VAR).is_ok()
+ || self.envs.contains_key(TEST_VERBOSITY_ENV_VAR)
+ {
+ command.stdout(Stdio::inherit());
+ command.stderr(Stdio::inherit());
+ } else {
+ command.stdout(self.get_stdout_file());
+ self.stdout_file_path =
Some(fs::canonicalize(self.get_stdout_file_path()).unwrap());
+ command.stderr(self.get_stderr_file());
+ self.stderr_file_path =
Some(fs::canonicalize(self.get_stderr_file_path()).unwrap());
+ }
+
let child = command.spawn().unwrap();
self.child_handle = Some(child);
}
@@ -118,29 +135,7 @@ impl TestMcpServer {
#[cfg(not(unix))]
child_handle.kill().unwrap();
- if let Ok(output) = child_handle.wait_with_output() {
- let stderr = String::from_utf8_lossy(&output.stderr);
- let stdout = String::from_utf8_lossy(&output.stdout);
- if let Some(stderr_file_path) = &self.stderr_file_path {
- OpenOptions::new()
- .append(true)
- .create(true)
- .open(stderr_file_path)
- .unwrap()
- .write_all(stderr.as_bytes())
- .unwrap();
- }
-
- if let Some(stdout_file_path) = &self.stdout_file_path {
- OpenOptions::new()
- .append(true)
- .create(true)
- .open(stdout_file_path)
- .unwrap()
- .write_all(stdout.as_bytes())
- .unwrap();
- }
- }
+ let _ = child_handle.wait();
}
}
@@ -224,10 +219,55 @@ impl TestMcpServer {
panic!("Failed to find a free port after {max_retries} retries");
}
+
+ fn get_stdout_file_path(&self) -> String {
+ format!("/tmp/iggy-mcp-{}.stdout", self.server_address.port())
+ }
+
+ fn get_stderr_file_path(&self) -> String {
+ format!("/tmp/iggy-mcp-{}.stderr", self.server_address.port())
+ }
+
+ fn get_stdout_file(&self) -> Stdio {
+ Stdio::from(File::create(self.get_stdout_file_path()).unwrap())
+ }
+
+ fn get_stderr_file(&self) -> Stdio {
+ Stdio::from(File::create(self.get_stderr_file_path()).unwrap())
+ }
+
+ fn read_file_to_string(path: &str) -> String {
+ fs::read_to_string(path).unwrap_or_else(|_| format!("Failed to read
file: {}", path))
+ }
}
impl Drop for TestMcpServer {
fn drop(&mut self) {
self.stop();
+
+ if panicking() {
+ if let Some(stdout_file_path) = &self.stdout_file_path {
+ eprintln!(
+ "Iggy MCP server stdout:\n{}",
+
Self::read_file_to_string(stdout_file_path.to_str().unwrap())
+ );
+ }
+
+ if let Some(stderr_file_path) = &self.stderr_file_path {
+ eprintln!(
+ "Iggy MCP server stderr:\n{}",
+
Self::read_file_to_string(stderr_file_path.to_str().unwrap())
+ );
+ }
+ }
+
+ // Clean up log files
+ if let Some(stdout_file_path) = &self.stdout_file_path {
+ fs::remove_file(stdout_file_path).unwrap();
+ }
+
+ if let Some(stderr_file_path) = &self.stderr_file_path {
+ fs::remove_file(stderr_file_path).unwrap();
+ }
}
}
diff --git a/core/integration/tests/server/scenarios/concurrent_scenario.rs
b/core/integration/tests/server/scenarios/concurrent_scenario.rs
index 88e76b3fd..e7a2831ef 100644
--- a/core/integration/tests/server/scenarios/concurrent_scenario.rs
+++ b/core/integration/tests/server/scenarios/concurrent_scenario.rs
@@ -21,8 +21,8 @@ use futures::future::join_all;
use iggy::prelude::*;
use integration::test_server::{ClientFactory, login_root};
-const OPERATIONS_COUNT: usize = 20;
-const MULTIPLE_CLIENT_COUNT: usize = 10;
+const OPERATIONS_COUNT: usize = 100;
+const MULTIPLE_CLIENT_COUNT: usize = 20;
const OPERATIONS_PER_CLIENT: usize = OPERATIONS_COUNT / MULTIPLE_CLIENT_COUNT;
const USER_PASSWORD: &str = "secret";
const TEST_STREAM_NAME: &str = "race-test-stream";
@@ -33,6 +33,8 @@ pub enum ResourceType {
User,
Stream,
Topic,
+ // TODO(hubcio): add ConsumerGroup
+ // TODO(hubcio): add Partition
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
diff --git
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
index eb7e80190..c8323ee13 100644
---
a/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/create_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -60,7 +61,7 @@ impl ServerCommandHandler for CreateConsumerGroup {
topic_id: self.topic_id.clone(),
cg,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
diff --git
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
index 5dfb9dda5..be50f5fec 100644
---
a/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/delete_consumer_group_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -87,7 +88,7 @@ impl ServerCommandHandler for DeleteConsumerGroup {
topic_id: self.topic_id.clone(),
group_id: self.group_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let stream_id = self.stream_id.clone();
let topic_id = self.topic_id.clone();
shard
diff --git
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
index bca8742f4..4cc94d85c 100644
---
a/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/join_consumer_group_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::consumer_groups::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
@@ -69,7 +70,7 @@ impl ServerCommandHandler for JoinConsumerGroup {
topic_id,
group_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
index 8e7955019..631636cf2 100644
---
a/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
+++
b/core/server/src/binary/handlers/consumer_groups/leave_consumer_group_handler.rs
@@ -20,6 +20,7 @@ use super::COMPONENT;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::sender::SenderKind;
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::session::Session;
@@ -71,7 +72,7 @@ impl ServerCommandHandler for LeaveConsumerGroup {
topic_id,
group_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index 6589031da..981858a08 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -19,8 +19,10 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
+use crate::slab::traits_ext::EntityMarker;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use crate::streaming::{streams, topics};
@@ -53,13 +55,19 @@ impl ServerCommandHandler for CreatePartitions {
self.partitions_count,
)
.await?;
+ let partition_ids = partitions.iter().map(|p|
p.id()).collect::<Vec<_>>();
let event = ShardEvent::CreatedPartitions2 {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
partitions,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
- // TODO: Rebalance the consumer group.
+ shard.broadcast_event_to_all_shards(event).await?;
+
+ shard.streams2.with_topic_by_id_mut(
+ &self.stream_id,
+ &self.topic_id,
+ topics::helpers::rebalance_consumer_group(shard.id,
&partition_ids),
+ );
let stream_id = shard
.streams2
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 98b56f76a..2680ab39e 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -61,8 +62,21 @@ impl ServerCommandHandler for DeletePartitions {
partitions_count: self.partitions_count,
partition_ids: deleted_partition_ids,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
- // TODO: Rebalance the consumer group.
+ shard.broadcast_event_to_all_shards(event).await?;
+
+ let remaining_partition_ids = shard.streams2.with_topic_by_id(
+ &self.stream_id,
+ &self.topic_id,
+ crate::streaming::topics::helpers::get_partition_ids(),
+ );
+ shard.streams2.with_topic_by_id_mut(
+ &self.stream_id,
+ &self.topic_id,
+ crate::streaming::topics::helpers::rebalance_consumer_group(
+ shard.id,
+ &remaining_partition_ids,
+ ),
+ );
shard
.state
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index ea7b947a4..9b2294fe3 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::personal_access_tokens::COMPONENT,
sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -60,7 +61,7 @@ impl ServerCommandHandler for CreatePersonalAccessToken {
let event = ShardEvent::CreatedPersonalAccessToken {
personal_access_token: personal_access_token.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
index b2708f9c4..e962aadcc 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/delete_personal_access_token_handler.rs
@@ -56,7 +56,7 @@ impl ServerCommandHandler for DeletePersonalAccessToken {
user_id: session.get_user_id(),
name: self.name.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
index e7ecc33f9..f18bded31 100644
---
a/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
+++
b/core/server/src/binary/handlers/personal_access_tokens/login_with_personal_access_token_handler.rs
@@ -60,7 +60,7 @@ impl ServerCommandHandler for LoginWithPersonalAccessToken {
token: self.token,
client_id: session.client_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
index ccb5f859d..c8c9b16fc 100644
--- a/core/server/src/binary/handlers/segments/delete_segments_handler.rs
+++ b/core/server/src/binary/handlers/segments/delete_segments_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::partitions::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -68,7 +69,7 @@ impl ServerCommandHandler for DeleteSegments {
partition_id: self.partition_id as usize,
segments_count: self.segments_count,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0b1c1ec0a..cfabf569d 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -60,7 +61,8 @@ impl ServerCommandHandler for CreateStream {
id: created_stream_id,
stream,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ shard.broadcast_event_to_all_shards(event).await?;
let response = shard
.streams2
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index f59b11b5b..43ff59f2a 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -65,7 +66,7 @@ impl ServerCommandHandler for DeleteStream {
id: stream2.id(),
stream_id: self.stream_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
index b8b35b645..d25d59a7f 100644
--- a/core/server/src/binary/handlers/streams/purge_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/purge_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -54,7 +55,7 @@ impl ServerCommandHandler for PurgeStream {
let event = ShardEvent::PurgedStream2 {
stream_id: self.stream_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
.apply(session.get_user_id(), &EntryCommand::PurgeStream(self))
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 996db7f4b..866c3f7ff 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::streams::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -55,7 +56,7 @@ impl ServerCommandHandler for UpdateStream {
stream_id: self.stream_id.clone(),
name: self.name.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
.apply(session.get_user_id(), &EntryCommand::UpdateStream(self))
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 35eba8d93..fadb137cd 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -20,6 +20,7 @@ use crate::binary::command::{BinaryServerCommand,
ServerCommand, ServerCommandHa
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::slab::traits_ext::EntityMarker;
@@ -71,7 +72,8 @@ impl ServerCommandHandler for CreateTopic {
stream_id: self.stream_id.clone(),
topic,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ shard.broadcast_event_to_all_shards(event).await?;
let partitions = shard
.create_partitions2(
session,
@@ -85,7 +87,8 @@ impl ServerCommandHandler for CreateTopic {
topic_id: Identifier::numeric(topic_id as u32).unwrap(),
partitions,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+
+ shard.broadcast_event_to_all_shards(event).await?;
let response = shard.streams2.with_topic_by_id(
&self.stream_id,
&Identifier::numeric(topic_id as u32).unwrap(),
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 69d9d9957..57b6294f3 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -71,7 +72,7 @@ impl ServerCommandHandler for DeleteTopic {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
index 7c4d5179c..220af3a58 100644
--- a/core/server/src/binary/handlers/topics/purge_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/purge_topic_handler.rs
@@ -60,7 +60,7 @@ impl ServerCommandHandler for PurgeTopic {
stream_id: self.stream_id.clone(),
topic_id: self.topic_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 8f9146ac9..d06ef5c11 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
@@ -81,7 +82,7 @@ impl ServerCommandHandler for UpdateTopic {
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let topic_id = self.topic_id.clone();
let stream_id = self.stream_id.clone();
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 0b595ee4b..f9fa2729d 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -19,6 +19,7 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -77,7 +78,7 @@ impl ServerCommandHandler for ChangePassword {
current_password: self.current_password.clone(),
new_password: self.new_password.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
// For the security of the system, we hash the password before storing
it in metadata.
shard
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 96521d5ff..8dc18e0f7 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -78,7 +78,7 @@ impl ServerCommandHandler for CreateUser {
status: self.status,
permissions: self.permissions.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let user_id = user.id;
let response = mapper::map_user(&user);
diff --git a/core/server/src/binary/handlers/users/delete_user_handler.rs
b/core/server/src/binary/handlers/users/delete_user_handler.rs
index f265f8df0..c807e6d04 100644
--- a/core/server/src/binary/handlers/users/delete_user_handler.rs
+++ b/core/server/src/binary/handlers/users/delete_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -66,7 +67,7 @@ impl ServerCommandHandler for DeleteUser {
let event = ShardEvent::DeletedUser {
user_id: self.user_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let user_id = self.user_id.clone();
shard
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index 395f2724b..6ee6471db 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -16,20 +16,19 @@
* under the License.
*/
-use std::rc::Rc;
-
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::mapper;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
-use crate::shard_info;
use crate::streaming::session::Session;
+use crate::{shard_info, shard_warn};
use anyhow::Result;
use error_set::ErrContext;
use iggy_common::IggyError;
use iggy_common::login_user::LoginUser;
+use std::rc::Rc;
use tracing::{debug, instrument};
impl ServerCommandHandler for LoginUser {
@@ -45,6 +44,11 @@ impl ServerCommandHandler for LoginUser {
session: &Rc<Session>,
shard: &Rc<IggyShard>,
) -> Result<(), IggyError> {
+ if shard.is_shutting_down() {
+ shard_warn!(shard.id, "Rejecting login request during shutdown");
+ return Err(IggyError::Disconnected);
+ }
+
debug!("session: {session}, command: {self}");
let LoginUser {
username, password, ..
@@ -70,7 +74,7 @@ impl ServerCommandHandler for LoginUser {
password,
};
// Broadcast the event to all shards.
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/binary/handlers/users/logout_user_handler.rs
b/core/server/src/binary/handlers/users/logout_user_handler.rs
index 7dbc99b4d..3f52d2255 100644
--- a/core/server/src/binary/handlers/users/logout_user_handler.rs
+++ b/core/server/src/binary/handlers/users/logout_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -61,7 +62,7 @@ impl ServerCommandHandler for LogoutUser {
let event = ShardEvent::LogoutUser {
client_id: session.client_id,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
session.clear_user_id();
sender.send_empty_ok_response().await?;
Ok(())
diff --git
a/core/server/src/binary/handlers/users/update_permissions_handler.rs
b/core/server/src/binary/handlers/users/update_permissions_handler.rs
index 4b42d9026..0856516d8 100644
--- a/core/server/src/binary/handlers/users/update_permissions_handler.rs
+++ b/core/server/src/binary/handlers/users/update_permissions_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -61,7 +62,7 @@ impl ServerCommandHandler for UpdatePermissions {
user_id: self.user_id.clone(),
permissions: self.permissions.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
shard
.state
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs
b/core/server/src/binary/handlers/users/update_user_handler.rs
index c8f28db38..cffd953c9 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -21,6 +21,7 @@ use std::rc::Rc;
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::users::COMPONENT, sender::SenderKind};
+
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::shard_info;
@@ -73,7 +74,7 @@ impl ServerCommandHandler for UpdateUser {
username: self.username.clone(),
status: self.status,
};
- let _responses = shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
let user_id = self.user_id.clone();
shard
diff --git a/core/server/src/quic/quic_server.rs
b/core/server/src/quic/quic_server.rs
index c5033213a..5fb3cd9b2 100644
--- a/core/server/src/quic/quic_server.rs
+++ b/core/server/src/quic/quic_server.rs
@@ -135,7 +135,7 @@ pub async fn spawn_quic_server(
protocol: iggy_common::TransportProtocol::Quic,
address: actual_addr,
};
- shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
}
} else {
shard.quic_bound_address.set(Some(actual_addr));
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index f65def97f..76e5b21c2 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -36,7 +36,7 @@ use crate::{
message::{ShardMessage, ShardRequest, ShardRequestPayload,
ShardSendRequestResult},
},
},
- shard_error, shard_info,
+ shard_error, shard_info, shard_warn,
slab::{streams::Streams, traits_ext::EntityMarker, users::Users},
state::file::FileState,
streaming::{
@@ -49,6 +49,7 @@ use builder::IggyShardBuilder;
use compio::io::AsyncWriteAtExt;
use dashmap::DashMap;
use error_set::ErrContext;
+use futures::future::join_all;
use hash32::{Hasher, Murmur3Hasher};
use iggy_common::{EncryptorKind, Identifier, IggyError, TransportProtocol};
use std::hash::Hasher as _;
@@ -64,6 +65,7 @@ use transmission::connector::{Receiver, ShardConnector,
StopReceiver};
pub const COMPONENT: &str = "SHARD";
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
+pub const BROADCAST_TIMEOUT: Duration = Duration::from_secs(20);
pub(crate) struct Shard {
id: u16,
@@ -86,7 +88,7 @@ impl Shard {
//TODO: Fixme
let response = receiver.recv().await.map_err(|err| {
error!("Failed to receive response from shard: {err}");
- IggyError::ShardCommunicationError(self.id)
+ IggyError::ShardCommunicationError
})?;
Ok(response)
}
@@ -672,7 +674,9 @@ impl IggyShard {
Ok(())
}
ShardEvent::CreatedTopic2 { stream_id, topic } => {
- let _topic_id = self.create_topic2_bypass_auth(&stream_id,
topic);
+ let topic_id_from_event = topic.id();
+ let topic_id = self.create_topic2_bypass_auth(&stream_id,
topic.clone());
+ assert_eq!(topic_id, topic_id_from_event);
Ok(())
}
ShardEvent::CreatedPartitions2 {
@@ -898,59 +902,68 @@ impl IggyShard {
}
}
- pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) ->
Vec<ShardResponse> {
- let mut responses =
Vec::with_capacity(self.get_available_shards_count() as usize);
- for maybe_receiver in self
+ pub async fn broadcast_event_to_all_shards(&self, event: ShardEvent) ->
Result<(), IggyError> {
+ if self.is_shutting_down() {
+ shard_info!(
+ self.id,
+ "Skipping broadcast during shutdown for event: {}",
+ event
+ );
+ return Ok(());
+ }
+
+ let event_type = event.to_string();
+ let futures = self
.shards
.iter()
- .filter_map(|shard| {
- if shard.id != self.id {
- Some(shard.connection.clone())
- } else {
- None
- }
- })
- .map(|conn| {
- // TODO: Fixme, maybe we should send response_sender
- // and propagate errors back.
+ .filter(|s| s.id != self.id)
+ .map(|shard| {
let event = event.clone();
- /*
- if matches!(
- &event,
- ShardEvent::CreatedStream2 { .. }
- | ShardEvent::DeletedStream2 { .. }
- | ShardEvent::CreatedTopic2 { .. }
- | ShardEvent::DeletedTopic2 { .. }
- | ShardEvent::UpdatedTopic2 { .. }
- | ShardEvent::CreatedPartitions2 { .. }
- | ShardEvent::DeletedPartitions2 { .. }
- | ShardEvent::CreatedConsumerGroup2 { .. }
- | ShardEvent::CreatedPersonalAccessToken { .. }
- | ShardEvent::DeletedConsumerGroup2 { .. }
- ) {
- */
- let (sender, receiver) = async_channel::bounded(1);
- conn.send(ShardFrame::new(event.into(), Some(sender.clone())));
- Some(receiver.clone())
- /*
- } else {
- conn.send(ShardFrame::new(event.into(), None));
- None
+ let conn = shard.connection.clone();
+ let shard_id = shard.id;
+ let self_id = self.id;
+ let event_type = event_type.clone();
+
+ async move {
+ let (sender, receiver) = async_channel::bounded(1);
+ conn.send(ShardFrame::new(ShardMessage::Event(event),
Some(sender)));
+
+ match compio::time::timeout(BROADCAST_TIMEOUT,
receiver.recv()).await {
+ Ok(Ok(_)) => Ok(()),
+ Ok(Err(e)) => {
+ shard_warn!(
+ self_id,
+ "Broadcast to shard {} failed for event {}:
channel error: {}",
+ shard_id, event_type, e
+ );
+ Err(())
+ }
+ Err(_) => {
+ shard_warn!(
+ self_id,
+ "Broadcast to shard {} failed for event {}:
timeout waiting for response after {:?}",
+ shard_id, event_type,
+ BROADCAST_TIMEOUT
+ );
+ Err(())
+ }
+ }
}
- */
})
- {
- match maybe_receiver {
- Some(receiver) => {
- let response = receiver.recv().await.unwrap();
- responses.push(response);
- }
- None => {
- responses.push(ShardResponse::Event);
- }
- }
+ .collect::<Vec<_>>();
+
+ if futures.is_empty() {
+ return Ok(());
+ }
+
+ let results = join_all(futures).await;
+ let has_failures = results.iter().any(|r| r.is_err());
+
+ if has_failures {
+ Err(IggyError::ShardCommunicationError)
+ } else {
+ Ok(())
}
- responses
}
pub fn add_active_session(&self, session: Rc<Session>) {
diff --git a/core/server/src/shard/system/streams.rs
b/core/server/src/shard/system/streams.rs
index 4ce440599..a67318f6c 100644
--- a/core/server/src/shard/system/streams.rs
+++ b/core/server/src/shard/system/streams.rs
@@ -19,12 +19,10 @@
use super::COMPONENT;
use crate::shard::IggyShard;
use crate::slab::traits_ext::{DeleteCell, EntityMarker, InsertCell};
-
use crate::streaming::session::Session;
use crate::streaming::streams::storage2::{create_stream_file_hierarchy,
delete_stream_from_disk};
use crate::streaming::streams::{self, stream2};
use error_set::ErrContext;
-
use iggy_common::{Identifier, IggyError};
impl IggyShard {
diff --git a/core/server/src/shard/system/users.rs
b/core/server/src/shard/system/users.rs
index d4a952f95..ab1c1e40f 100644
--- a/core/server/src/shard/system/users.rs
+++ b/core/server/src/shard/system/users.rs
@@ -121,11 +121,13 @@ impl IggyShard {
permissions: Option<Permissions>,
) -> Result<(), IggyError> {
let assigned_user_id = self.create_user_base(username, password,
status, permissions)?;
+
assert_eq!(
assigned_user_id as u32, expected_user_id,
"User ID mismatch: expected {}, got {}. This indicates shards are
out of sync.",
expected_user_id, assigned_user_id
);
+
Ok(())
}
@@ -235,7 +237,6 @@ impl IggyShard {
let numeric_user_id = Identifier::numeric(user.id).unwrap();
if let Some(ref new_username) = username {
- let user = self.get_user(user_id)?;
let existing_user =
self.get_user(&new_username.to_owned().try_into()?);
if existing_user.is_ok() && existing_user.unwrap().id != user.id {
error!("User: {new_username} already exists.");
@@ -252,6 +253,7 @@ impl IggyShard {
format!("{COMPONENT} update user (error: {error}) - failed to
update user with id: {user_id}")
})?;
}
+
self.get_user(&numeric_user_id)
.with_error_context(|error| {
format!("{COMPONENT} update user (error: {error}) - failed to
get updated user with id: {user_id}")
diff --git a/core/server/src/shard/transmission/event.rs
b/core/server/src/shard/transmission/event.rs
index 45f335878..d897cfd00 100644
--- a/core/server/src/shard/transmission/event.rs
+++ b/core/server/src/shard/transmission/event.rs
@@ -12,9 +12,11 @@ use iggy_common::{
UserStatus,
};
use std::net::SocketAddr;
+use strum::Display;
#[allow(clippy::large_enum_variant)]
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Display)]
+#[strum(serialize_all = "PascalCase")]
pub enum ShardEvent {
FlushUnsavedBuffer {
stream_id: Identifier,
diff --git a/core/server/src/slab/users.rs b/core/server/src/slab/users.rs
index c4859a86e..9ff71eb08 100644
--- a/core/server/src/slab/users.rs
+++ b/core/server/src/slab/users.rs
@@ -153,6 +153,7 @@ impl Users {
.ok_or_else(||
IggyError::ResourceNotFound(username.to_string()))?
}
};
+
let old_username = {
let users = self.users.borrow();
let user = users
@@ -160,15 +161,18 @@ impl Users {
.ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
user.username.clone()
};
+
if old_username == new_username {
return Ok(());
}
+
tracing::trace!(
"Updating username: '{}' → '{}' for user ID: {}",
old_username,
new_username,
id
);
+
{
let mut users = self.users.borrow_mut();
let user = users
@@ -176,9 +180,11 @@ impl Users {
.ok_or_else(||
IggyError::ResourceNotFound(identifier.to_string()))?;
user.username = new_username.clone();
}
+
let mut index = self.index.borrow_mut();
index.remove(&old_username);
index.insert(new_username, id);
+
Ok(())
}
}
diff --git a/core/server/src/streaming/topics/helpers.rs
b/core/server/src/streaming/topics/helpers.rs
index 87c29c364..3ad1f230b 100644
--- a/core/server/src/streaming/topics/helpers.rs
+++ b/core/server/src/streaming/topics/helpers.rs
@@ -6,7 +6,10 @@ use crate::{
Keyed,
consumer_groups::{self, ConsumerGroups},
topics::{self, Topics},
- traits_ext::{ComponentsById, Delete, DeleteCell, EntityMarker},
+ traits_ext::{
+ ComponentsById, Delete, DeleteCell, EntityComponentSystem,
EntityComponentSystemMut,
+ EntityMarker, IntoComponents,
+ },
},
streaming::{
stats::TopicStats,
@@ -44,6 +47,15 @@ pub fn get_topic_id() -> impl
FnOnce(ComponentsById<TopicRef>) -> topics::Contai
|(root, _, _)| root.id()
}
+pub fn get_partition_ids() -> impl FnOnce(ComponentsById<TopicRef>) ->
Vec<usize> {
+ |(root, ..)| {
+ root.partitions().with_components(|components| {
+ let (roots, ..) = components.into_components();
+ roots.iter().map(|(id, _)| id).collect()
+ })
+ }
+}
+
pub fn get_message_expiry() -> impl FnOnce(ComponentsById<TopicRef>) ->
IggyExpiry {
|(root, _, _)| root.message_expiry()
}
@@ -144,6 +156,28 @@ pub fn leave_consumer_group(
}
}
+pub fn rebalance_consumer_group(
+ shard_id: u16,
+ partition_ids: &[usize],
+) -> impl FnOnce(ComponentsById<TopicRefMut>) {
+ move |(mut root, ..)| {
+ root.consumer_groups_mut()
+ .with_components_mut(|components| {
+ let (all_roots, all_members) = components.into_components();
+ for ((_, consumer_group_root), (_, members)) in
+ all_roots.iter().zip(all_members.iter_mut())
+ {
+ let id = consumer_group_root.id();
+ members.inner_mut().rcu(|existing_members| {
+ let mut new_members = mimic_members(existing_members);
+ assign_partitions_to_members(shard_id, id, &mut
new_members, partition_ids);
+ new_members
+ });
+ }
+ });
+ }
+}
+
pub fn get_consumer_group_member_id(
client_id: u32,
) -> impl FnOnce(ComponentsById<ConsumerGroupRef>) -> usize {
@@ -206,7 +240,7 @@ fn add_member(
members.inner_mut().rcu(move |members| {
let mut members = mimic_members(members);
Member::new(client_id).insert_into(&mut members);
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
+ assign_partitions_to_members(shard_id, id, &mut members, partitions);
members
});
}
@@ -231,7 +265,7 @@ fn delete_member(
entry.id = idx;
true
});
- assign_partitions_to_members(shard_id, id, &mut members,
partitions.to_vec());
+ assign_partitions_to_members(shard_id, id, &mut members, partitions);
members
});
}
@@ -240,7 +274,7 @@ fn assign_partitions_to_members(
shard_id: u16,
id: usize,
members: &mut Slab<Member>,
- partitions: Vec<usize>,
+ partitions: &[usize],
) {
members
.iter_mut()
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index c963103da..d7165dac8 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -18,6 +18,7 @@
use crate::binary::sender::SenderKind;
use crate::configs::tcp::TcpSocketConfig;
+
use crate::shard::IggyShard;
use crate::shard::task_registry::ShutdownToken;
use crate::shard::transmission::event::ShardEvent;
@@ -111,7 +112,7 @@ pub async fn start(
protocol: TransportProtocol::Tcp,
address: actual_addr,
};
- shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
}
}
@@ -148,8 +149,9 @@ async fn accept_loop(
shard_clone.add_active_session(session.clone());
// Broadcast session to all shards.
let event = ShardEvent::NewSession { address,
transport };
- // TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard_clone.broadcast_event_to_all_shards(event).await;
+ if let Err(err) =
shard_clone.broadcast_event_to_all_shards(event).await {
+ shard_error!(shard.id, "Failed to broadcast
NewSession: {:?}", err);
+ }
let client_id = session.client_id;
let user_id = session.get_user_id();
@@ -168,7 +170,9 @@ async fn accept_loop(
}
registry_clone.remove_connection(&client_id);
let event = ShardEvent::ClientDisconnected {
client_id, user_id };
- let _responses =
shard_for_conn.broadcast_event_to_all_shards(event).await;
+ if let Err(err) =
shard_for_conn.broadcast_event_to_all_shards(event).await {
+ shard_error!(shard_for_conn.id, "Failed to
broadcast ClientDisconnected: {:?}", err);
+ }
if let Err(error) = sender.shutdown().await {
shard_error!(shard.id, "Failed to shutdown TCP
stream for client: {}, address: {}. {}", client_id, address, error);
diff --git a/core/server/src/tcp/tcp_tls_listener.rs
b/core/server/src/tcp/tcp_tls_listener.rs
index 56504df36..2f739c166 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -79,7 +79,7 @@ pub(crate) async fn start(
protocol: TransportProtocol::Tcp,
address: actual_addr,
};
- shard.broadcast_event_to_all_shards(event).await;
+ shard.broadcast_event_to_all_shards(event).await?;
}
}