This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch io_uring_tpc
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/io_uring_tpc by this push:
     new f03994cc feat(io_uring): fix race during opening partitions (#1987)
f03994cc is described below

commit f03994ccb0e5730d112fdf265bc509249461b47c
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Wed Jul 9 16:34:24 2025 +0200

    feat(io_uring): fix race during opening partitions (#1987)
---
 .../partitions/create_partitions_handler.rs        |  4 +--
 .../partitions/delete_partitions_handler.rs        |  4 +--
 .../handlers/streams/create_stream_handler.rs      |  2 +-
 .../handlers/streams/delete_stream_handler.rs      |  4 +--
 .../handlers/streams/update_stream_handler.rs      |  2 +-
 .../binary/handlers/topics/create_topic_handler.rs |  4 +--
 .../binary/handlers/topics/delete_topic_handler.rs |  2 +-
 .../binary/handlers/topics/update_topic_handler.rs |  4 +--
 .../binary/handlers/users/login_user_handler.rs    |  2 +-
 core/server/src/shard/mod.rs                       | 29 +++++++++++++++++-----
 core/server/src/shard/system/topics.rs             |  3 ++-
 core/server/src/tcp/tcp_listener.rs                |  2 +-
 core/server/src/tcp/tcp_tls_listener.rs            |  2 +-
 13 files changed, 41 insertions(+), 23 deletions(-)

diff --git 
a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
index f7dffca2..5f68f073 100644
--- a/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/create_partitions_handler.rs
@@ -68,7 +68,7 @@ impl ServerCommandHandler for CreatePartitions {
             topic_id: self.topic_id.clone(),
             partitions_count: partition_ids.len() as u32,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         let stream = shard.get_stream(&stream_id).unwrap();
         let topic = stream.get_topic(&topic_id).unwrap();
@@ -98,7 +98,7 @@ impl ServerCommandHandler for CreatePartitions {
             topic_id: numeric_topic_id,
             partition_ids,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         shard
         .state
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index 67aa8f49..3fef67ed 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -67,7 +67,7 @@ impl ServerCommandHandler for DeletePartitions {
             topic_id: topic_id.clone(),
             partition_ids: partition_ids.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         let stream = shard.get_stream(&stream_id).with_error_context(|error| {
             format!("{COMPONENT} (error: {error}) - failed to get stream with 
ID: {stream_id}")
         })?;
@@ -99,7 +99,7 @@ impl ServerCommandHandler for DeletePartitions {
             }
         }
         let event = ShardEvent::DeletedShardTableRecords { namespaces };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         shard
         .state
diff --git a/core/server/src/binary/handlers/streams/create_stream_handler.rs 
b/core/server/src/binary/handlers/streams/create_stream_handler.rs
index 0e746b76..7426f4de 100644
--- a/core/server/src/binary/handlers/streams/create_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/create_stream_handler.rs
@@ -58,7 +58,7 @@ impl ServerCommandHandler for CreateStream {
                 })?;
         let event = ShardEvent::CreatedStream { stream_id, name };
         // Broadcast the event to all shards.
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         let stream = shard.find_stream(session, &created_stream_id)
             .with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/streams/delete_stream_handler.rs 
b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
index 36cb3454..d2aef96c 100644
--- a/core/server/src/binary/handlers/streams/delete_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/delete_stream_handler.rs
@@ -77,7 +77,7 @@ impl ServerCommandHandler for DeleteStream {
                 }
             }
             let event = ShardEvent::DeletedShardTableRecords { namespaces };
-            let _responses = shard.broadcast_event_to_all_shards(event.into());
+            let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
             //TODO: Once event response is implemented, we could get rid of 
this.
             compio::time::sleep(Duration::from_millis(50)).await;
             topic.delete().await.with_error_context(|error| {
@@ -90,7 +90,7 @@ impl ServerCommandHandler for DeleteStream {
         let event = ShardEvent::DeletedStream {
             stream_id: self.stream_id.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/streams/update_stream_handler.rs 
b/core/server/src/binary/handlers/streams/update_stream_handler.rs
index 6be10ed0..dda62158 100644
--- a/core/server/src/binary/handlers/streams/update_stream_handler.rs
+++ b/core/server/src/binary/handlers/streams/update_stream_handler.rs
@@ -56,7 +56,7 @@ impl ServerCommandHandler for UpdateStream {
             stream_id: self.stream_id.clone(),
             name: self.name.clone(),
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         shard
             .state
diff --git a/core/server/src/binary/handlers/topics/create_topic_handler.rs 
b/core/server/src/binary/handlers/topics/create_topic_handler.rs
index 183b5ee1..840816e0 100644
--- a/core/server/src/binary/handlers/topics/create_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/create_topic_handler.rs
@@ -78,7 +78,7 @@ impl ServerCommandHandler for CreateTopic {
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         let stream = shard.get_stream(&stream_id).with_error_context(|error| {
             format!(
                 "{COMPONENT} (error: {error}) - failed to get stream for 
stream_id: {stream_id}"
@@ -114,7 +114,7 @@ impl ServerCommandHandler for CreateTopic {
             topic_id: numeric_topic_id,
             partition_ids,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         self.message_expiry = topic.message_expiry;
         self.max_topic_size = topic.max_topic_size;
diff --git a/core/server/src/binary/handlers/topics/delete_topic_handler.rs 
b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
index 431db62f..d5681029 100644
--- a/core/server/src/binary/handlers/topics/delete_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/delete_topic_handler.rs
@@ -75,7 +75,7 @@ impl ServerCommandHandler for DeleteTopic {
             }
         }
         let event = ShardEvent::DeletedShardTableRecords { namespaces };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
         //TODO: Once event response is implemented, we could get rid of this.
         compio::time::sleep(Duration::from_millis(50)).await;
         topic.delete().await.with_error_context(|error| {
diff --git a/core/server/src/binary/handlers/topics/update_topic_handler.rs 
b/core/server/src/binary/handlers/topics/update_topic_handler.rs
index 3e1ec864..55a158a4 100644
--- a/core/server/src/binary/handlers/topics/update_topic_handler.rs
+++ b/core/server/src/binary/handlers/topics/update_topic_handler.rs
@@ -19,8 +19,8 @@
 use crate::binary::command::{BinaryServerCommand, ServerCommand, 
ServerCommandHandler};
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::binary::{handlers::topics::COMPONENT, sender::SenderKind};
-use crate::shard::transmission::event::ShardEvent;
 use crate::shard::IggyShard;
+use crate::shard::transmission::event::ShardEvent;
 use crate::state::command::EntryCommand;
 use crate::streaming::session::Session;
 use anyhow::Result;
@@ -70,7 +70,7 @@ impl ServerCommandHandler for UpdateTopic {
             max_topic_size: self.max_topic_size,
             replication_factor: self.replication_factor,
         };
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         let stream = shard.find_stream(session, &self.stream_id)
             .with_error_context(|error| format!(
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index b6b163b6..f1db7c51 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -62,7 +62,7 @@ impl ServerCommandHandler for LoginUser {
             password,
         };
         // Broadcast the event to all shards.
-        let _responses = shard.broadcast_event_to_all_shards(event.into());
+        let _responses = 
shard.broadcast_event_to_all_shards(event.into()).await;
 
         let identity_info = mapper::map_identity_info(user.id);
         sender.send_ok_response(&identity_info).await?;
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index aff112b1..d4358f66 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -781,8 +781,9 @@ impl IggyShard {
         }
     }
 
-    pub fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) -> 
Vec<ShardResponse> {
-        self.shards
+    pub async fn broadcast_event_to_all_shards(&self, event: Arc<ShardEvent>) 
-> Vec<ShardResponse> {
+        let mut responses = 
Vec::with_capacity(self.get_available_shards_count() as usize);
+        for maybe_receiver in self.shards
             .iter()
             .filter_map(|shard| {
                 if shard.id != self.id {
@@ -794,10 +795,26 @@ impl IggyShard {
             .map(|conn| {
                 // TODO: Fixme, maybe we should send response_sender
                 // and propagate errors back.
-                conn.send(ShardFrame::new(event.clone().into(), None));
-                ShardResponse::Event
-            })
-            .collect()
+                if let ShardEvent::CreatedShardTableRecords { .. } = &*event {
+                    let (sender, receiver) = async_channel::bounded(1);
+                    conn.send(ShardFrame::new(event.clone().into(), 
Some(sender.clone())));
+                    Some(receiver.clone())
+                } else {
+                    conn.send(ShardFrame::new(event.clone().into(), None));
+                    None
+                }
+            }) {
+                match maybe_receiver {
+                    Some(receiver) => {
+                        let response = receiver.recv().await.unwrap();
+                        responses.push(response);
+                    }
+                    None => {
+                        responses.push(ShardResponse::Event);
+                    }
+                }
+            }
+            responses
     }
 
     fn find_shard(&self, namespace: &IggyNamespace) -> Option<&Shard> {
diff --git a/core/server/src/shard/system/topics.rs 
b/core/server/src/shard/system/topics.rs
index 7406a11c..4a76c0df 100644
--- a/core/server/src/shard/system/topics.rs
+++ b/core/server/src/shard/system/topics.rs
@@ -239,7 +239,8 @@ impl IggyShard {
             compression_algorithm,
             max_topic_size,
             replication_factor,
-        ).await
+        )
+        .await
     }
 
     #[allow(clippy::too_many_arguments)]
diff --git a/core/server/src/tcp/tcp_listener.rs 
b/core/server/src/tcp/tcp_listener.rs
index 49a86580..6ad2cf08 100644
--- a/core/server/src/tcp/tcp_listener.rs
+++ b/core/server/src/tcp/tcp_listener.rs
@@ -107,7 +107,7 @@ pub async fn start(
                         // Broadcast session to all shards.
                         let event = ShardEvent::NewSession { address, 
transport };
                         // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
+                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into()).await;
 
                         let client_id = session.client_id;
                         info!("Created new session: {session}");
diff --git a/core/server/src/tcp/tcp_tls_listener.rs 
b/core/server/src/tcp/tcp_tls_listener.rs
index 00d7b5c4..dde6725e 100644
--- a/core/server/src/tcp/tcp_tls_listener.rs
+++ b/core/server/src/tcp/tcp_tls_listener.rs
@@ -102,7 +102,7 @@ pub(crate) async fn start(
                         // Broadcast session to all shards.
                         let event = ShardEvent::NewSession { address, 
transport };
                         // TODO: Fixme look inside of 
broadcast_event_to_all_shards method.
-                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into());
+                        let _responses = 
shard_clone.broadcast_event_to_all_shards(event.into()).await;
 
                         let client_id = session.client_id;
                         info!("Created new session: {session}");

Reply via email to