This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching-rebase-3 in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 476d72e3b001c1edce19784c1a107fdfe6424662 Author: Hubert Gruszecki <h.grusze...@gmail.com> AuthorDate: Sat Apr 12 10:45:01 2025 +0200 even more improvements --- server/src/streaming/segments/indexes/index_writer.rs | 7 +++---- server/src/streaming/segments/messages/messages_writer.rs | 4 ++-- server/src/streaming/segments/messages/mod.rs | 2 +- server/src/streaming/segments/messages/persister_task.rs | 2 +- server/src/streaming/segments/segment.rs | 6 ++---- server/src/streaming/segments/types/messages_batch_mut.rs | 13 ------------- server/src/streaming/segments/types/messages_batch_set.rs | 8 ++++---- server/src/streaming/segments/writing_messages.rs | 10 ++++++---- 8 files changed, 19 insertions(+), 33 deletions(-) diff --git a/server/src/streaming/segments/indexes/index_writer.rs b/server/src/streaming/segments/indexes/index_writer.rs index 56476a3a..9fd9525e 100644 --- a/server/src/streaming/segments/indexes/index_writer.rs +++ b/server/src/streaming/segments/indexes/index_writer.rs @@ -104,13 +104,12 @@ impl IndexWriter { }) .map_err(|_| IggyError::CannotSaveIndexToSegment)?; - if self.fsync { - let _ = self.fsync().await; - } - self.index_size_bytes .fetch_add(indexes.len() as u64, Ordering::Release); + if self.fsync { + let _ = self.fsync().await; + } trace!( "Saved {count} indexes of size {} to file: {}", INDEX_SIZE * count, diff --git a/server/src/streaming/segments/messages/messages_writer.rs b/server/src/streaming/segments/messages/messages_writer.rs index 018b4075..bb51f8b4 100644 --- a/server/src/streaming/segments/messages/messages_writer.rs +++ b/server/src/streaming/segments/messages/messages_writer.rs @@ -107,7 +107,7 @@ impl MessagesWriter { /// Append a batch of messages to the messages file. pub async fn save_batch_set( &mut self, - batch_set: &IggyMessagesBatchSet, + batch_set: IggyMessagesBatchSet, confirmation: Confirmation, ) -> Result<IggyByteSize, IggyError> { let messages_size = batch_set.size(); @@ -147,7 +147,7 @@ impl MessagesWriter { } Confirmation::NoWait => { if let Some(task) = &self.persister_task { - task.persist(batch_set.clone()).await; + task.persist(batch_set).await; } else { panic!( "Confirmation::NoWait is used, but MessagesPersisterTask is not set for messages file: {}", diff --git a/server/src/streaming/segments/messages/mod.rs b/server/src/streaming/segments/messages/mod.rs index ce102d72..7cfcdb49 100644 --- a/server/src/streaming/segments/messages/mod.rs +++ b/server/src/streaming/segments/messages/mod.rs @@ -34,7 +34,7 @@ pub use persister_task::PersisterTask; async fn write_batch( file: &mut File, file_path: &str, - batches: &IggyMessagesBatchSet, + batches: IggyMessagesBatchSet, ) -> Result<usize, IggyError> { let mut slices: Vec<IoSlice> = batches.iter().map(|b| IoSlice::new(b)).collect(); diff --git a/server/src/streaming/segments/messages/persister_task.rs b/server/src/streaming/segments/messages/persister_task.rs index c3482970..887913d2 100644 --- a/server/src/streaming/segments/messages/persister_task.rs +++ b/server/src/streaming/segments/messages/persister_task.rs @@ -175,7 +175,7 @@ impl PersisterTask { while let Ok(request) = receiver.recv_async().await { match request { PersisterTaskCommand::WriteRequest(messages) => { - match write_batch(&mut file, &file_path, &messages).await { + match write_batch(&mut file, &file_path, messages).await { Ok(bytes_written) => { if fsync { file.sync_all() diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index e691be19..026834ae 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -31,8 +31,6 @@ use std::sync::Arc; use tokio::fs::remove_file; use tracing::{info, warn}; -const INDEXES_CAPACITY: usize = 20 * 1000 * 1000; // 24 MB, 15 million indexes (16 bytes each) for 15 million messages of 1kB - #[derive(Debug)] pub struct Segment { pub(super) stream_id: u32, @@ -102,7 +100,7 @@ impl Segment { last_index_position: 0, max_size_bytes: config.segment.size, message_expiry, - indexes: IggyIndexesMut::with_capacity(INDEXES_CAPACITY, 0), + indexes: IggyIndexesMut::with_capacity(1_000_000, 0), accumulator: MessagesAccumulator::default(), is_closed: false, messages_writer: None, @@ -402,9 +400,9 @@ impl Segment { self.end_offset = end_offset; } + /// Explicitly drop the old indexes to ensure memory is freed pub fn drop_indexes(&mut self) { let old_indexes = std::mem::replace(&mut self.indexes, IggyIndexesMut::empty()); - // Explicitly drop the old indexes to ensure memory is freed drop(old_indexes); } } diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs b/server/src/streaming/segments/types/messages_batch_mut.rs index be09baa0..60af072b 100644 --- a/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/server/src/streaming/segments/types/messages_batch_mut.rs @@ -49,19 +49,6 @@ impl Sizeable for IggyMessagesBatchMut { } impl IggyMessagesBatchMut { - /// Creates an empty messages container with the specified capacity to avoid reallocations. - /// - /// # Arguments - /// - /// * `bytes_capacity` - The expected total size of all messages in bytes - pub fn with_capacity(bytes_capacity: usize) -> Self { - let index_capacity = bytes_capacity / INDEX_SIZE + 1; // Add 1 to avoid rounding down to 0 - Self { - indexes: IggyIndexesMut::with_capacity(index_capacity, 0), - messages: BytesMut::with_capacity(bytes_capacity), - } - } - /// Creates a new messages container from existing index and message buffers. /// /// # Arguments diff --git a/server/src/streaming/segments/types/messages_batch_set.rs b/server/src/streaming/segments/types/messages_batch_set.rs index 6230ac5a..6f32b383 100644 --- a/server/src/streaming/segments/types/messages_batch_set.rs +++ b/server/src/streaming/segments/types/messages_batch_set.rs @@ -26,7 +26,7 @@ use std::ops::Index; use tracing::trace; /// A container for multiple IggyMessagesBatch objects -#[derive(Debug, Default, Clone)] +#[derive(Debug, Default)] pub struct IggyMessagesBatchSet { /// The collection of message containers batches: Vec<IggyMessagesBatch>, @@ -221,8 +221,8 @@ impl IggyMessagesBatchSet { } let first_offset = container.first_offset(); - if first_offset.is_some() - && first_offset.unwrap() + container.count() as u64 <= start_offset + if first_offset.is_none() + || first_offset.unwrap() + container.count() as u64 <= start_offset { continue; } @@ -256,7 +256,7 @@ impl IggyMessagesBatchSet { } let first_timestamp = container.first_timestamp(); - if first_timestamp.is_some() && first_timestamp.unwrap() < timestamp { + if first_timestamp.is_none() || first_timestamp.unwrap() < timestamp { continue; } diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index e1e52701..7e80574b 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -90,6 +90,8 @@ impl Segment { None => self.config.segment.server_confirmation, }; + batches.extract_indexes_to(&mut self.indexes); + let batch_size = batches.size(); let batch_count = batches.count(); @@ -97,7 +99,7 @@ impl Segment { .messages_writer .as_mut() .expect("Messages writer not initialized") - .save_batch_set(&batches, confirmation) + .save_batch_set(batches, confirmation) .await .with_error_context(|error| { format!( @@ -107,8 +109,6 @@ impl Segment { self.last_index_position += saved_bytes.as_bytes_u64() as u32; - batches.extract_indexes_to(&mut self.indexes); - let unsaved_indexes_slice = self.indexes.unsaved_slice(); self.index_writer .as_mut() @@ -168,7 +168,9 @@ impl Segment { current_segment_size, max_segment_size_from_config ); - if self.config.segment.cache_indexes == CacheIndexesConfig::OpenSegment { + if self.config.segment.cache_indexes == CacheIndexesConfig::OpenSegment + || self.config.segment.cache_indexes == CacheIndexesConfig::None + { self.drop_indexes(); } self.shutdown_writing().await;