This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/io_uring_tpc by this push:
new f03994cc feat(io_uring): fix race during opening partitions (#1987)
f03994cc is described below
commit f03994ccb0e5730d112fdf265bc509249461b47c
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Jul 9 16:34:24 2025 +0200
feat(io_uring): fix race during opening partitions (#1987)
---
.../partitions/create_partitions_handler.rs | 4 +--
.../partitions/delete_partitions_handler.rs | 4 +--
.../handlers/streams/create_stream_handler.rs | 2 +-
.../handlers/streams/delete_stream_handler.rs | 4 +--
.../handlers/streams/update_stream_handler.rs | 2 +-
.../binary/handlers/topics/create_topic_handler.rs | 4 +--
.../binary/handlers/topics/delete_topic_handler.rs | 2 +-
.../binary/handlers/topics/update_topic_handler.rs | 4 +--
.../binary/handlers/users/login_user_handler.rs | 2 +-
core/server/src/shard/mod.rs | 29 +++++++++++++++++-----
core/server/src/shard/system/topics.rs | 3 ++-
core/server/src/tcp/tcp_listener.rs | 2 +-
core/server/src/tcp/tcp_tls_listener.rs | 2 +-
13 files changed, 41 insertions(+), 23 deletions(-)
diff --git
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index f7dffca2..5f68f073 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -68,7 +68,7 @@ impl ServerCommandHandler for CreatePartitions {
topic_id: self.topic_id.clone(),
partitions_count: partition_ids.len() as u32,
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.get_stream(&stream_id).unwrap();
let topic = stream.get_topic(&topic_id).unwrap();
@@ -98,7 +98,7 @@ impl ServerCommandHandler for CreatePartitions {
topic_id: numeric_topic_id,
partition_ids,
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
diff --git
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 67aa8f49..3fef67ed 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -67,7 +67,7 @@ impl ServerCommandHandler for DeletePartitions {
topic_id: topic_id.clone(),
partition_ids: partition_ids.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.get_stream(&stream_id).with_error_context(|error| {
format!("{COMPONENT} (error: {error}) - failed to get stream with
ID: {stream_id}")
})?;
@@ -99,7 +99,7 @@ impl ServerCommandHandler for DeletePartitions {
}
}
let event = ShardEvent::DeletedShardTableRecords { namespaces };
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0e746b76..7426f4de 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -58,7 +58,7 @@ impl ServerCommandHandler for CreateStream {
})?;
let event = ShardEvent::CreatedStream { stream_id, name };
// Broadcast the event to all shards.
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.find_stream(session, &created_stream_id)
.with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 36cb3454..d2aef96c 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -77,7 +77,7 @@ impl ServerCommandHandler for DeleteStream {
}
}
let event = ShardEvent::DeletedShardTableRecords { namespaces };
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
//TODO: Once event response is implemented, we could get rid of
this.
compio::time::sleep(Duration::from_millis(50)).await;
topic.delete().await.with_error_context(|error| {
@@ -90,7 +90,7 @@ impl ServerCommandHandler for DeleteStream {
let event = ShardEvent::DeletedStream {
stream_id: self.stream_id.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 6be10ed0..dda62158 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -56,7 +56,7 @@ impl ServerCommandHandler for UpdateStream {
stream_id: self.stream_id.clone(),
name: self.name.clone(),
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
shard
.state
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 183b5ee1..840816e0 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -78,7 +78,7 @@ impl ServerCommandHandler for CreateTopic {
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.get_stream(&stream_id).with_error_context(|error| {
format!(
"{COMPONENT} (error: {error}) - failed to get stream for
stream_id: {stream_id}"
@@ -114,7 +114,7 @@ impl ServerCommandHandler for CreateTopic {
topic_id: numeric_topic_id,
partition_ids,
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
self.message_expiry = topic.message_expiry;
self.max_topic_size = topic.max_topic_size;
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 431db62f..d5681029 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -75,7 +75,7 @@ impl ServerCommandHandler for DeleteTopic {
}
}
let event = ShardEvent::DeletedShardTableRecords { namespaces };
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
//TODO: Once event response is implemented, we could get rid of this.
compio::time::sleep(Duration::from_millis(50)).await;
topic.delete().await.with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 3e1ec864..55a158a4 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,8 +19,8 @@
use crate::binary::command::{BinaryServerCommand, ServerCommand,
ServerCommandHandler};
use crate::binary::handlers::utils::receive_and_validate;
use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
-use crate::shard::transmission::event::ShardEvent;
use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
use crate::state::command::EntryCommand;
use crate::streaming::session::Session;
use anyhow::Result;
@@ -70,7 +70,7 @@ impl ServerCommandHandler for UpdateTopic {
max_topic_size: self.max_topic_size,
replication_factor: self.replication_factor,
};
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let stream = shard.find_stream(session, &self.stream_id)
.with_error_context(|error| format!(
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs
b/core/server/src/binary/handlers/users/login_user_handler.rs
index b6b163b6..f1db7c51 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -62,7 +62,7 @@ impl ServerCommandHandler for LoginUser {
password,
};
// Broadcast the event to all shards.
- let _responses = shard.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard.broadcast_event_to_all_shards(event.into()).await;
let identity_info = mapper::map_identity_info(user.id);
sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index aff112b1..d4358f66 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -781,8 +781,9 @@ impl IggyShard {
}
}
- pub fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) ->
Vec<ShardResponse> {
- self.shards
+ pub async fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>)
-> Vec<ShardResponse> {
+ let mut responses =
Vec::with_capacity(self.get_available_shards_count() as usize);
+ for maybe_receiver in self.shards
.iter()
.filter_map(|shard| {
if shard.id != self.id {
@@ -794,10 +795,26 @@ impl IggyShard {
.map(|conn| {
// TODO: Fixme, maybe we should send response_sender
// and propagate errors back.
- conn.send(ShardFrame::new(event.clone().into(), None));
- ShardResponse::Event
- })
- .collect()
+ if let ShardEvent::CreatedShardTableRecords { .. } = &*event {
+ let (sender, receiver) = async_channel::bounded(1);
+ conn.send(ShardFrame::new(event.clone().into(),
Some(sender.clone())));
+ Some(receiver.clone())
+ } else {
+ conn.send(ShardFrame::new(event.clone().into(), None));
+ None
+ }
+ }) {
+ match maybe_receiver {
+ Some(receiver) => {
+ let response = receiver.recv().await.unwrap();
+ responses.push(response);
+ }
+ None => {
+ responses.push(ShardResponse::Event);
+ }
+ }
+ }
+ responses
}
fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
diff --git a/core/server/src/shard/system/topics.rs
b/core/server/src/shard/system/topics.rs
index 7406a11c..4a76c0df 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -239,7 +239,8 @@ impl IggyShard {
compression_algorithm,
max_topic_size,
replication_factor,
- ).await
+ )
+ .await
}
#[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/tcp/tcp_listener.rs
b/core/server/src/tcp/tcp_listener.rs
index 49a86580..6ad2cf08 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -107,7 +107,7 @@ pub async fn start(
// Broadcast session to all shards.
let event = ShardEvent::NewSession { address,
transport };
// TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard_clone.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard_clone.broadcast_event_to_all_shards(event.into()).await;
let client_id = session.client_id;
info!("Created new session: {session}");
diff --git a/core/server/src/tcp/tcp_tls_listener.rs
b/core/server/src/tcp/tcp_tls_listener.rs
index 00d7b5c4..dde6725e 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -102,7 +102,7 @@ pub(crate) async fn start(
// Broadcast session to all shards.
let event = ShardEvent::NewSession { address,
transport };
// TODO: Fixme look inside of
broadcast_event_to_all_shards method.
- let _responses =
shard_clone.broadcast_event_to_all_shards(event.into());
+ let _responses =
shard_clone.broadcast_event_to_all_shards(event.into()).await;
let client_id = session.client_id;
info!("Created new session: {session}");