This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch fix_concurrency_bug
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b252307d0d82b05e6fa8036b526d06fc96b73158
Author: numminex <[email protected]>
AuthorDate: Wed Oct 1 20:56:16 2025 +0200

    feat(io_uring): fix nasty concurrency bug
---
 core/server/src/slab/consumer_groups.rs |   7 +-
 core/server/src/slab/partitions.rs      |   5 +-
 core/server/src/slab/streams.rs         | 215 +++++++++++++++++---------------
 core/server/src/slab/topics.rs          |   7 +-
 4 files changed, 122 insertions(+), 112 deletions(-)

diff --git a/core/server/src/slab/consumer_groups.rs 
b/core/server/src/slab/consumer_groups.rs
index 101364fe..bb314902 100644
--- a/core/server/src/slab/consumer_groups.rs
+++ b/core/server/src/slab/consumer_groups.rs
@@ -81,11 +81,12 @@ impl EntityComponentSystem<Borrow> for ConsumerGroups {
     {
         f(self.into())
     }
-    
+
     fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
     where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O {
-       f(self.into())
+        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
+    {
+        f(self.into())
     }
 }
 
diff --git a/core/server/src/slab/partitions.rs 
b/core/server/src/slab/partitions.rs
index b678bc6c..baaa3a5e 100644
--- a/core/server/src/slab/partitions.rs
+++ b/core/server/src/slab/partitions.rs
@@ -164,10 +164,11 @@ impl EntityComponentSystem<Borrow> for Partitions {
     {
         f(self.into())
     }
-    
+
     fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
     where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O {
+        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
+    {
         f(self.into())
     }
 }
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 79947bb5..c8940b4f 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -145,10 +145,11 @@ impl EntityComponentSystem<InteriorMutability> for 
Streams {
     {
         f(self.into())
     }
-    
+
     fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
     where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O {
+        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
+    {
         f(self.into())
     }
 }
@@ -580,7 +581,6 @@ impl Streams {
         })
     }
 
-
     pub fn with_partitions_async<T>(
         &self,
         stream_id: &Identifier,
@@ -862,57 +862,60 @@ impl Streams {
         count: u32,
         segment_start_offset: u64,
     ) -> Result<IggyMessagesBatchSet, IggyError> {
-        // Convert start_offset to relative offset within the segment
         let relative_start_offset = (start_offset - segment_start_offset) as 
u32;
 
-        // Use with_partition_by_id_async to perform disk I/O inside the async 
closure
-        self.with_partition_by_id_async(stream_id, topic_id, partition_id, 
async move |(.., log)| {
-            let storage = log.storages()[idx].clone();
-            let indexes = log.indexes()[idx].as_ref();
-
-            // Load indexes first
-            let indexes_to_read = if let Some(indexes) = indexes {
-                if !indexes.is_empty() {
-                    indexes.slice_by_offset(relative_start_offset, count)
-                } else {
-                    storage
-                        .index_reader
-                        .as_ref()
-                        .expect("Index reader not initialized")
-                        .load_from_disk_by_offset(relative_start_offset, count)
-                        .await?
-                }
+        let (index_reader, messages_reader, indexes) = 
self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            |(_, _, _, _, _, _, log)| {
+                let index_reader = 
log.storages()[idx].index_reader.as_ref().expect("Index reader not 
initialized").clone();
+                let message_reader = log.storages()[idx]
+                    .messages_reader
+                    .as_ref()
+                    .expect("Messages reader not initialized")
+                    .clone();
+                let indexes = log.indexes()[idx]
+                    .as_ref().map(|indexes| 
indexes.slice_by_offset(relative_start_offset, count).unwrap_or_default());
+                (index_reader, message_reader, indexes)
+            },
+        );
+
+        // Perform disk I/O on owned data
+        let indexes_to_read = if let Some(indexes) = indexes {
+            if !indexes.is_empty() {
+                Some(indexes)
             } else {
-                storage
-                    .index_reader
+                index_reader
                     .as_ref()
-                    .expect("Index reader not initialized")
                     .load_from_disk_by_offset(relative_start_offset, count)
                     .await?
-            };
-
-            if indexes_to_read.is_none() {
-                return Ok(IggyMessagesBatchSet::empty());
             }
-
-            let indexes_to_read = indexes_to_read.unwrap();
-            let batch = storage
-                .messages_reader
+        } else {
+            index_reader
                 .as_ref()
-                .expect("Messages reader not initialized")
-                .load_messages_from_disk(indexes_to_read)
-                .await
-                .with_error_context(|error| format!("Failed to load messages 
from disk: {error}"))?;
+                .load_from_disk_by_offset(relative_start_offset, count)
+                .await?
+        };
 
-            batch
-                .validate_checksums_and_offsets(start_offset)
-                .with_error_context(|error| {
-                    format!("Failed to validate messages read from disk! 
error: {error}")
-                })?;
+        if indexes_to_read.is_none() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
 
-            Ok(IggyMessagesBatchSet::from(batch))
-        })
-        .await
+        let indexes_to_read = indexes_to_read.unwrap();
+        let batch = messages_reader
+            .as_ref()
+            .load_messages_from_disk(indexes_to_read)
+            .await
+            .with_error_context(|error| format!("Failed to load messages from 
disk: {error}"))?;
+
+        batch
+            .validate_checksums_and_offsets(start_offset)
+            .with_error_context(|error| {
+                format!("Failed to validate messages read from disk! error: 
{error}")
+            })?;
+
+        Ok(IggyMessagesBatchSet::from(batch))
     }
 
     pub async fn get_messages_by_timestamp(
@@ -1017,7 +1020,7 @@ impl Streams {
                     let storage = log.storages()[idx].clone();
                     let indexes = log.indexes()[idx]
                         .as_ref()
-                        .map(|indexes| indexes.slice_by_offset(0, 
u32::MAX).unwrap());
+                        .map(|indexes| indexes.slice_by_timestamp(0, 
u32::MAX).unwrap());
                     (storage, indexes)
                 },
             );
@@ -1272,40 +1275,45 @@ impl Streams {
         let batch_count = batches.count();
         let batch_size = batches.size();
 
-        // Perform disk I/O inside the async closure
-        let saved = self
-            .with_partition_by_id_async(stream_id, topic_id, partition_id, 
async move |(.., log)| {
-                let storage = log.active_storage().clone();
-                
-                let saved = storage
-                    .messages_writer
-                    .as_ref()
-                    .expect("Messages writer not initialized")
-                    .save_batch_set(batches)
-                    .await
-                    .with_error_context(|error| {
-                        format!(
-                            "Failed to save batch of {batch_count} messages \
-                            ({batch_size} bytes) to stream ID: {stream_id}, 
topic ID: {topic_id}, partition ID: {partition_id}. {error}",
-                        )
-                    })?;
+        // Extract storage before async operations
+        let (messages_writer, index_writer) =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_storage()
+                        .messages_writer
+                        .as_ref()
+                        .expect("Messages writer not initialized")
+                        .clone(),
+                    log.active_storage().index_writer.as_ref().expect("Index 
writer not initialized").clone(),
+                )
+            });
 
-                let unsaved_indexes_slice = 
log.active_indexes().unwrap().unsaved_slice();
-                let indexes_len = unsaved_indexes_slice.len();
-                
-                storage
-                    .index_writer
-                    .as_ref()
-                    .expect("Index writer not initialized")
-                    .save_indexes(unsaved_indexes_slice)
-                    .await
-                    .with_error_context(|error| {
-                        format!("Failed to save index of {indexes_len} indexes 
to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",)
-                    })?;
-
-                Ok::<_, IggyError>(saved)
-            })
-            .await?;
+        let saved = 
+            messages_writer
+            .as_ref()
+            .save_batch_set(batches)
+            .await
+            .with_error_context(|error| {
+                format!(
+                    "Failed to save batch of {batch_count} messages \
+                    ({batch_size} bytes) to stream ID: {stream_id}, topic ID: 
{topic_id}, partition ID: {partition_id}. {error}",
+                )
+            })?;
+
+        // Extract unsaved indexes before async operation
+        let unsaved_indexes_slice =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.active_indexes().unwrap().unsaved_slice()
+            });
+
+        let indexes_len = unsaved_indexes_slice.len();
+        index_writer
+            .as_ref()
+            .save_indexes(unsaved_indexes_slice)
+            .await
+            .with_error_context(|error| {
+                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",)
+            })?;
 
         shard_trace!(
             shard_id,
@@ -1337,37 +1345,36 @@ impl Streams {
         topic_id: &Identifier,
         partition_id: usize,
     ) -> Result<(), IggyError> {
-        self.with_partition_by_id_async(stream_id, topic_id, partition_id, 
async move |(.., log)| {
-            let storage = log.active_storage();
+        let storage = self.with_partition_by_id(stream_id, topic_id, 
partition_id, |(.., log)| {
+            log.active_storage().clone()
+        });
 
-            if storage.messages_writer.is_none() || 
storage.index_writer.is_none() {
-                return Ok(());
-            }
+        if storage.messages_writer.is_none() || storage.index_writer.is_none() 
{
+            return Ok(());
+        }
 
-            if let Some(ref messages_writer) = storage.messages_writer {
-                if let Err(e) = messages_writer.fsync().await {
-                    tracing::error!(
-                        "Failed to fsync messages writer for partition {}: {}",
-                        partition_id,
-                        e
-                    );
-                    return Err(e);
-                }
+        if let Some(ref messages_writer) = storage.messages_writer {
+            if let Err(e) = messages_writer.fsync().await {
+                tracing::error!(
+                    "Failed to fsync messages writer for partition {}: {}",
+                    partition_id,
+                    e
+                );
+                return Err(e);
             }
+        }
 
-            if let Some(ref index_writer) = storage.index_writer {
-                if let Err(e) = index_writer.fsync().await {
-                    tracing::error!(
-                        "Failed to fsync index writer for partition {}: {}",
-                        partition_id,
-                        e
-                    );
-                    return Err(e);
-                }
+        if let Some(ref index_writer) = storage.index_writer {
+            if let Err(e) = index_writer.fsync().await {
+                tracing::error!(
+                    "Failed to fsync index writer for partition {}: {}",
+                    partition_id,
+                    e
+                );
+                return Err(e);
             }
+        }
 
-            Ok(())
-        })
-        .await
+        Ok(())
     }
 }
diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs
index cfc09bea..b947be4f 100644
--- a/core/server/src/slab/topics.rs
+++ b/core/server/src/slab/topics.rs
@@ -129,11 +129,12 @@ impl EntityComponentSystem<InteriorMutability> for Topics 
{
     {
         f(self.into())
     }
-    
+
     fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O>
     where
-        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O {
-            f(self.into())
+        F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O,
+    {
+        f(self.into())
     }
 }
 

Reply via email to