This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching-rebase-3
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 476d72e3b001c1edce19784c1a107fdfe6424662
Author: Hubert Gruszecki <h.grusze...@gmail.com>
AuthorDate: Sat Apr 12 10:45:01 2025 +0200

    even more improvements
---
 server/src/streaming/segments/indexes/index_writer.rs     |  7 +++----
 server/src/streaming/segments/messages/messages_writer.rs |  4 ++--
 server/src/streaming/segments/messages/mod.rs             |  2 +-
 server/src/streaming/segments/messages/persister_task.rs  |  2 +-
 server/src/streaming/segments/segment.rs                  |  6 ++----
 server/src/streaming/segments/types/messages_batch_mut.rs | 13 -------------
 server/src/streaming/segments/types/messages_batch_set.rs |  8 ++++----
 server/src/streaming/segments/writing_messages.rs         | 10 ++++++----
 8 files changed, 19 insertions(+), 33 deletions(-)

diff --git a/server/src/streaming/segments/indexes/index_writer.rs 
b/server/src/streaming/segments/indexes/index_writer.rs
index 56476a3a..9fd9525e 100644
--- a/server/src/streaming/segments/indexes/index_writer.rs
+++ b/server/src/streaming/segments/indexes/index_writer.rs
@@ -104,13 +104,12 @@ impl IndexWriter {
             })
             .map_err(|_| IggyError::CannotSaveIndexToSegment)?;
 
-        if self.fsync {
-            let _ = self.fsync().await;
-        }
-
         self.index_size_bytes
             .fetch_add(indexes.len() as u64, Ordering::Release);
 
+        if self.fsync {
+            let _ = self.fsync().await;
+        }
         trace!(
             "Saved {count} indexes of size {} to file: {}",
             INDEX_SIZE * count,
diff --git a/server/src/streaming/segments/messages/messages_writer.rs 
b/server/src/streaming/segments/messages/messages_writer.rs
index 018b4075..bb51f8b4 100644
--- a/server/src/streaming/segments/messages/messages_writer.rs
+++ b/server/src/streaming/segments/messages/messages_writer.rs
@@ -107,7 +107,7 @@ impl MessagesWriter {
     /// Append a batch of messages to the messages file.
     pub async fn save_batch_set(
         &mut self,
-        batch_set: &IggyMessagesBatchSet,
+        batch_set: IggyMessagesBatchSet,
         confirmation: Confirmation,
     ) -> Result<IggyByteSize, IggyError> {
         let messages_size = batch_set.size();
@@ -147,7 +147,7 @@ impl MessagesWriter {
             }
             Confirmation::NoWait => {
                 if let Some(task) = &self.persister_task {
-                    task.persist(batch_set.clone()).await;
+                    task.persist(batch_set).await;
                 } else {
                     panic!(
                         "Confirmation::NoWait is used, but 
MessagesPersisterTask is not set for messages file: {}",
diff --git a/server/src/streaming/segments/messages/mod.rs 
b/server/src/streaming/segments/messages/mod.rs
index ce102d72..7cfcdb49 100644
--- a/server/src/streaming/segments/messages/mod.rs
+++ b/server/src/streaming/segments/messages/mod.rs
@@ -34,7 +34,7 @@ pub use persister_task::PersisterTask;
 async fn write_batch(
     file: &mut File,
     file_path: &str,
-    batches: &IggyMessagesBatchSet,
+    batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
     let mut slices: Vec<IoSlice> = batches.iter().map(|b| 
IoSlice::new(b)).collect();
 
diff --git a/server/src/streaming/segments/messages/persister_task.rs 
b/server/src/streaming/segments/messages/persister_task.rs
index c3482970..887913d2 100644
--- a/server/src/streaming/segments/messages/persister_task.rs
+++ b/server/src/streaming/segments/messages/persister_task.rs
@@ -175,7 +175,7 @@ impl PersisterTask {
         while let Ok(request) = receiver.recv_async().await {
             match request {
                 PersisterTaskCommand::WriteRequest(messages) => {
-                    match write_batch(&mut file, &file_path, &messages).await {
+                    match write_batch(&mut file, &file_path, messages).await {
                         Ok(bytes_written) => {
                             if fsync {
                                 file.sync_all()
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index e691be19..026834ae 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -31,8 +31,6 @@ use std::sync::Arc;
 use tokio::fs::remove_file;
 use tracing::{info, warn};
 
-const INDEXES_CAPACITY: usize = 20 * 1000 * 1000; // 24 MB, 15 million indexes 
(16 bytes each) for 15 million messages of 1kB
-
 #[derive(Debug)]
 pub struct Segment {
     pub(super) stream_id: u32,
@@ -102,7 +100,7 @@ impl Segment {
             last_index_position: 0,
             max_size_bytes: config.segment.size,
             message_expiry,
-            indexes: IggyIndexesMut::with_capacity(INDEXES_CAPACITY, 0),
+            indexes: IggyIndexesMut::with_capacity(1_000_000, 0),
             accumulator: MessagesAccumulator::default(),
             is_closed: false,
             messages_writer: None,
@@ -402,9 +400,9 @@ impl Segment {
         self.end_offset = end_offset;
     }
 
+    /// Explicitly drop the old indexes to ensure memory is freed
     pub fn drop_indexes(&mut self) {
         let old_indexes = std::mem::replace(&mut self.indexes, 
IggyIndexesMut::empty());
-        // Explicitly drop the old indexes to ensure memory is freed
         drop(old_indexes);
     }
 }
diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs 
b/server/src/streaming/segments/types/messages_batch_mut.rs
index be09baa0..60af072b 100644
--- a/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -49,19 +49,6 @@ impl Sizeable for IggyMessagesBatchMut {
 }
 
 impl IggyMessagesBatchMut {
-    /// Creates an empty messages container with the specified capacity to 
avoid reallocations.
-    ///
-    /// # Arguments
-    ///
-    /// * `bytes_capacity` - The expected total size of all messages in bytes
-    pub fn with_capacity(bytes_capacity: usize) -> Self {
-        let index_capacity = bytes_capacity / INDEX_SIZE + 1; // Add 1 to 
avoid rounding down to 0
-        Self {
-            indexes: IggyIndexesMut::with_capacity(index_capacity, 0),
-            messages: BytesMut::with_capacity(bytes_capacity),
-        }
-    }
-
     /// Creates a new messages container from existing index and message 
buffers.
     ///
     /// # Arguments
diff --git a/server/src/streaming/segments/types/messages_batch_set.rs 
b/server/src/streaming/segments/types/messages_batch_set.rs
index 6230ac5a..6f32b383 100644
--- a/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/server/src/streaming/segments/types/messages_batch_set.rs
@@ -26,7 +26,7 @@ use std::ops::Index;
 use tracing::trace;
 
 /// A container for multiple IggyMessagesBatch objects
-#[derive(Debug, Default, Clone)]
+#[derive(Debug, Default)]
 pub struct IggyMessagesBatchSet {
     /// The collection of message containers
     batches: Vec<IggyMessagesBatch>,
@@ -221,8 +221,8 @@ impl IggyMessagesBatchSet {
             }
 
             let first_offset = container.first_offset();
-            if first_offset.is_some()
-                && first_offset.unwrap() + container.count() as u64 <= 
start_offset
+            if first_offset.is_none()
+                || first_offset.unwrap() + container.count() as u64 <= 
start_offset
             {
                 continue;
             }
@@ -256,7 +256,7 @@ impl IggyMessagesBatchSet {
             }
 
             let first_timestamp = container.first_timestamp();
-            if first_timestamp.is_some() && first_timestamp.unwrap() < 
timestamp {
+            if first_timestamp.is_none() || first_timestamp.unwrap() < 
timestamp {
                 continue;
             }
 
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index e1e52701..7e80574b 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -90,6 +90,8 @@ impl Segment {
             None => self.config.segment.server_confirmation,
         };
 
+        batches.extract_indexes_to(&mut self.indexes);
+
         let batch_size = batches.size();
         let batch_count = batches.count();
 
@@ -97,7 +99,7 @@ impl Segment {
             .messages_writer
             .as_mut()
             .expect("Messages writer not initialized")
-            .save_batch_set(&batches, confirmation)
+            .save_batch_set(batches, confirmation)
             .await
             .with_error_context(|error| {
                 format!(
@@ -107,8 +109,6 @@ impl Segment {
 
         self.last_index_position += saved_bytes.as_bytes_u64() as u32;
 
-        batches.extract_indexes_to(&mut self.indexes);
-
         let unsaved_indexes_slice = self.indexes.unsaved_slice();
         self.index_writer
             .as_mut()
@@ -168,7 +168,9 @@ impl Segment {
                 current_segment_size,
                 max_segment_size_from_config
             );
-            if self.config.segment.cache_indexes == 
CacheIndexesConfig::OpenSegment {
+            if self.config.segment.cache_indexes == 
CacheIndexesConfig::OpenSegment
+                || self.config.segment.cache_indexes == 
CacheIndexesConfig::None
+            {
                 self.drop_indexes();
             }
             self.shutdown_writing().await;

Reply via email to