This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch fix_concurrency_bug in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b252307d0d82b05e6fa8036b526d06fc96b73158 Author: numminex <[email protected]> AuthorDate: Wed Oct 1 20:56:16 2025 +0200 feat(io_uring): fix nasty concurrency bug --- core/server/src/slab/consumer_groups.rs | 7 +- core/server/src/slab/partitions.rs | 5 +- core/server/src/slab/streams.rs | 215 +++++++++++++++++--------------- core/server/src/slab/topics.rs | 7 +- 4 files changed, 122 insertions(+), 112 deletions(-) diff --git a/core/server/src/slab/consumer_groups.rs b/core/server/src/slab/consumer_groups.rs index 101364fe..bb314902 100644 --- a/core/server/src/slab/consumer_groups.rs +++ b/core/server/src/slab/consumer_groups.rs @@ -81,11 +81,12 @@ impl EntityComponentSystem<Borrow> for ConsumerGroups { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { - f(self.into()) + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, + { + f(self.into()) } } diff --git a/core/server/src/slab/partitions.rs b/core/server/src/slab/partitions.rs index b678bc6c..baaa3a5e 100644 --- a/core/server/src/slab/partitions.rs +++ b/core/server/src/slab/partitions.rs @@ -164,10 +164,11 @@ impl EntityComponentSystem<Borrow> for Partitions { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, + { f(self.into()) } } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 79947bb5..c8940b4f 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -145,10 +145,11 @@ impl EntityComponentSystem<InteriorMutability> for Streams { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, + { f(self.into()) } } @@ -580,7 +581,6 @@ impl Streams { }) } - pub fn with_partitions_async<T>( &self, stream_id: &Identifier, @@ -862,57 +862,60 @@ impl Streams { count: u32, segment_start_offset: u64, ) -> Result<IggyMessagesBatchSet, IggyError> { - // Convert start_offset to relative offset within the segment let relative_start_offset = (start_offset - segment_start_offset) as u32; - // Use with_partition_by_id_async to perform disk I/O inside the async closure - self.with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { - let storage = log.storages()[idx].clone(); - let indexes = log.indexes()[idx].as_ref(); - - // Load indexes first - let indexes_to_read = if let Some(indexes) = indexes { - if !indexes.is_empty() { - indexes.slice_by_offset(relative_start_offset, count) - } else { - storage - .index_reader - .as_ref() - .expect("Index reader not initialized") - .load_from_disk_by_offset(relative_start_offset, count) - .await? - } + let (index_reader, messages_reader, indexes) = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(_, _, _, _, _, _, log)| { + let index_reader = log.storages()[idx].index_reader.as_ref().expect("Index reader not initialized").clone(); + let message_reader = log.storages()[idx] + .messages_reader + .as_ref() + .expect("Messages reader not initialized") + .clone(); + let indexes = log.indexes()[idx] + .as_ref().map(|indexes| indexes.slice_by_offset(relative_start_offset, count).unwrap_or_default()); + (index_reader, message_reader, indexes) + }, + ); + + // Perform disk I/O on owned data + let indexes_to_read = if let Some(indexes) = indexes { + if !indexes.is_empty() { + Some(indexes) } else { - storage - .index_reader + index_reader .as_ref() - .expect("Index reader not initialized") .load_from_disk_by_offset(relative_start_offset, count) .await? - }; - - if indexes_to_read.is_none() { - return Ok(IggyMessagesBatchSet::empty()); } - - let indexes_to_read = indexes_to_read.unwrap(); - let batch = storage - .messages_reader + } else { + index_reader .as_ref() - .expect("Messages reader not initialized") - .load_messages_from_disk(indexes_to_read) - .await - .with_error_context(|error| format!("Failed to load messages from disk: {error}"))?; + .load_from_disk_by_offset(relative_start_offset, count) + .await? + }; - batch - .validate_checksums_and_offsets(start_offset) - .with_error_context(|error| { - format!("Failed to validate messages read from disk! error: {error}") - })?; + if indexes_to_read.is_none() { + return Ok(IggyMessagesBatchSet::empty()); + } - Ok(IggyMessagesBatchSet::from(batch)) - }) - .await + let indexes_to_read = indexes_to_read.unwrap(); + let batch = messages_reader + .as_ref() + .load_messages_from_disk(indexes_to_read) + .await + .with_error_context(|error| format!("Failed to load messages from disk: {error}"))?; + + batch + .validate_checksums_and_offsets(start_offset) + .with_error_context(|error| { + format!("Failed to validate messages read from disk! error: {error}") + })?; + + Ok(IggyMessagesBatchSet::from(batch)) } pub async fn get_messages_by_timestamp( @@ -1017,7 +1020,7 @@ impl Streams { let storage = log.storages()[idx].clone(); let indexes = log.indexes()[idx] .as_ref() - .map(|indexes| indexes.slice_by_offset(0, u32::MAX).unwrap()); + .map(|indexes| indexes.slice_by_timestamp(0, u32::MAX).unwrap()); (storage, indexes) }, ); @@ -1272,40 +1275,45 @@ impl Streams { let batch_count = batches.count(); let batch_size = batches.size(); - // Perform disk I/O inside the async closure - let saved = self - .with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { - let storage = log.active_storage().clone(); - - let saved = storage - .messages_writer - .as_ref() - .expect("Messages writer not initialized") - .save_batch_set(batches) - .await - .with_error_context(|error| { - format!( - "Failed to save batch of {batch_count} messages \ - ({batch_size} bytes) to stream ID: {stream_id}, topic ID: {topic_id}, partition ID: {partition_id}. {error}", - ) - })?; + // Extract storage before async operations + let (messages_writer, index_writer) = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + ( + log.active_storage() + .messages_writer + .as_ref() + .expect("Messages writer not initialized") + .clone(), + log.active_storage().index_writer.as_ref().expect("Index writer not initialized").clone(), + ) + }); - let unsaved_indexes_slice = log.active_indexes().unwrap().unsaved_slice(); - let indexes_len = unsaved_indexes_slice.len(); - - storage - .index_writer - .as_ref() - .expect("Index writer not initialized") - .save_indexes(unsaved_indexes_slice) - .await - .with_error_context(|error| { - format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",) - })?; - - Ok::<_, IggyError>(saved) - }) - .await?; + let saved = + messages_writer + .as_ref() + .save_batch_set(batches) + .await + .with_error_context(|error| { + format!( + "Failed to save batch of {batch_count} messages \ + ({batch_size} bytes) to stream ID: {stream_id}, topic ID: {topic_id}, partition ID: {partition_id}. {error}", + ) + })?; + + // Extract unsaved indexes before async operation + let unsaved_indexes_slice = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.active_indexes().unwrap().unsaved_slice() + }); + + let indexes_len = unsaved_indexes_slice.len(); + index_writer + .as_ref() + .save_indexes(unsaved_indexes_slice) + .await + .with_error_context(|error| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {error}",) + })?; shard_trace!( shard_id, @@ -1337,37 +1345,36 @@ impl Streams { topic_id: &Identifier, partition_id: usize, ) -> Result<(), IggyError> { - self.with_partition_by_id_async(stream_id, topic_id, partition_id, async move |(.., log)| { - let storage = log.active_storage(); + let storage = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.active_storage().clone() + }); - if storage.messages_writer.is_none() || storage.index_writer.is_none() { - return Ok(()); - } + if storage.messages_writer.is_none() || storage.index_writer.is_none() { + return Ok(()); + } - if let Some(ref messages_writer) = storage.messages_writer { - if let Err(e) = messages_writer.fsync().await { - tracing::error!( - "Failed to fsync messages writer for partition {}: {}", - partition_id, - e - ); - return Err(e); - } + if let Some(ref messages_writer) = storage.messages_writer { + if let Err(e) = messages_writer.fsync().await { + tracing::error!( + "Failed to fsync messages writer for partition {}: {}", + partition_id, + e + ); + return Err(e); } + } - if let Some(ref index_writer) = storage.index_writer { - if let Err(e) = index_writer.fsync().await { - tracing::error!( - "Failed to fsync index writer for partition {}: {}", - partition_id, - e - ); - return Err(e); - } + if let Some(ref index_writer) = storage.index_writer { + if let Err(e) = index_writer.fsync().await { + tracing::error!( + "Failed to fsync index writer for partition {}: {}", + partition_id, + e + ); + return Err(e); } + } - Ok(()) - }) - .await + Ok(()) } } diff --git a/core/server/src/slab/topics.rs b/core/server/src/slab/topics.rs index cfc09bea..b947be4f 100644 --- a/core/server/src/slab/topics.rs +++ b/core/server/src/slab/topics.rs @@ -129,11 +129,12 @@ impl EntityComponentSystem<InteriorMutability> for Topics { { f(self.into()) } - + fn with_components_async<O, F>(&self, f: F) -> impl Future<Output = O> where - F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O { - f(self.into()) + F: for<'a> AsyncFnOnce(Self::EntityComponents<'a>) -> O, + { + f(self.into()) } }
