This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 57afcb4a982fe2cc0c79bd5e65581bb7bbcb2da5
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jul 8 00:38:06 2025 +0200

    improvements
---
 core/server/src/streaming/segments/direct_file.rs  | 679 +++++++++++++++++++--
 .../src/streaming/segments/indexes/index_writer.rs |   2 +-
 core/server/src/streaming/segments/messages/mod.rs |  17 +-
 core/server/src/streaming/utils/pooled_buffer.rs   |  10 -
 4 files changed, 632 insertions(+), 76 deletions(-)

diff --git a/core/server/src/streaming/segments/direct_file.rs 
b/core/server/src/streaming/segments/direct_file.rs
index ce1ecb788..7de7c4b33 100644
--- a/core/server/src/streaming/segments/direct_file.rs
+++ b/core/server/src/streaming/segments/direct_file.rs
@@ -17,18 +17,63 @@
  */
 
 use crate::streaming::utils::{ALIGNMENT, PooledBuffer};
+use compio::buf::{IntoInner, IoBuf};
 use compio::fs::{File, OpenOptions};
 use compio::io::AsyncWriteAtExt;
 use error_set::ErrContext;
 use iggy_common::IggyError;
 
+const O_DIRECT: i32 = 0x4000;
+const O_DSYNC: i32 = 0x1000;
+
+/// Cache line padding to prevent false sharing
+/// Most modern CPUs have 64-byte cache lines
+#[repr(align(64))]
+#[derive(Debug)]
+struct Padded<T>(T);
+
+/// Stack-allocated scratch buffer for small writes
+/// Aligned to 4KiB for optimal performance
+#[repr(align(512))]
+#[derive(Debug)]
+struct ScratchBuffer([u8; 4096]);
+
+/// A wrapper that allows us to pass a slice of ScratchBuffer as IoBuf
+/// This is safe because DirectFile owns the ScratchBuffer for its entire 
lifetime
+struct ScratchSlice {
+    ptr: *const u8,
+    len: usize,
+}
+
+// SAFETY: ScratchSlice is only created from DirectFile's scratch buffer,
+// which lives for the entire lifetime of DirectFile. The async I/O operation
+// completes before any method returns, ensuring the buffer remains valid.
+unsafe impl Send for ScratchSlice {}
+unsafe impl Sync for ScratchSlice {}
+
+unsafe impl IoBuf for ScratchSlice {
+    fn as_buf_ptr(&self) -> *const u8 {
+        self.ptr
+    }
+
+    fn buf_len(&self) -> usize {
+        self.len
+    }
+
+    fn buf_capacity(&self) -> usize {
+        self.len
+    }
+}
+
 #[derive(Debug)]
 pub struct DirectFile {
     file_path: String,
     file: File,
     file_position: u64,
     tail: PooledBuffer,
-    tail_len: usize,
+    tail_len: Padded<usize>, // Padded to avoid false sharing on hot path
+    spare: PooledBuffer,     // Reusable buffer for carry-over in 
write_vectored
+    scratch: ScratchBuffer,  // Stack-allocated scratch buffer for small writes
 }
 
 impl DirectFile {
@@ -40,7 +85,7 @@ impl DirectFile {
         let mut file = OpenOptions::new()
             .create(true)
             .write(true)
-            .custom_flags(0x4000)
+            .custom_flags(O_DIRECT | O_DSYNC)
             .open(file_path)
             .await
             .with_error_context(|err| {
@@ -77,7 +122,9 @@ impl DirectFile {
             file,
             file_position: initial_position,
             tail: PooledBuffer::with_capacity(ALIGNMENT),
-            tail_len: 0,
+            tail_len: Padded(0),
+            spare: PooledBuffer::with_capacity(ALIGNMENT),
+            scratch: ScratchBuffer([0u8; 4096]),
         })
     }
 
@@ -101,75 +148,172 @@ impl DirectFile {
             file,
             file_position: initial_position,
             tail: PooledBuffer::with_capacity(ALIGNMENT),
-            tail_len: 0,
+            tail_len: Padded(0),
+            spare: PooledBuffer::with_capacity(ALIGNMENT),
+            scratch: ScratchBuffer([0u8; 4096]),
         }
     }
 
-    pub async fn write_all(&mut self, mut data: &[u8]) -> Result<usize, 
IggyError> {
-        let initial_len = data.len();
+    /// Write data from an owned PooledBuffer with zero-copy optimization
+    ///
+    /// This method takes ownership of the buffer to satisfy the 'static 
lifetime
+    /// requirement for async I/O operations. The buffer ownership is passed 
directly
+    /// to the kernel for DMA operations, ensuring sound memory management 
without
+    /// requiring unsafe code or risking use-after-free bugs.
+    ///
+    /// # Memory Model
+    /// - Takes ownership of `PooledBuffer` (already aligned and pooled)
+    /// - Passes buffer directly to kernel via compio (zero-copy for aligned 
data)
+    /// - Returns buffer to pool automatically when dropped
+    ///
+    /// # Performance
+    /// - Zero-copy for aligned portions (direct DMA transfer)
+    /// - Single memcpy only for tail data (< 512 bytes per call)
+    pub async fn write_all(&mut self, buffer: PooledBuffer) -> Result<usize, 
IggyError> {
+        let initial_len = buffer.len();
         tracing::trace!(
             "DirectFile write_all called for file: {}, data_len: {}, position: 
{}, tail_len: {}",
             self.file_path,
             initial_len,
             self.file_position,
-            self.tail_len
+            self.tail_len.0
         );
 
-        if self.tail_len > 0 {
-            let need = ALIGNMENT - self.tail_len;
-            let take = need.min(data.len());
-            self.tail.extend_from_slice(&data[..take]);
-            self.tail_len += take;
-            data = &data[take..];
-
-            if self.tail_len == ALIGNMENT {
-                self.flush_tail().await?;
-            }
+        // Fast path: no tail data and buffer is perfectly aligned
+        if self.tail_len.0 == 0 && buffer.len() % ALIGNMENT == 0 {
+            // Direct write with owned buffer - zero-copy all the way!
+            let (result, _) = self
+                .file
+                .write_all_at(buffer, self.file_position)
+                .await
+                .into();
+
+            result.map_err(|e| {
+                tracing::error!(
+                    "Failed to write to direct file: {} at position {}, 
buffer_len: {}, error: {}",
+                    self.file_path,
+                    self.file_position,
+                    initial_len,
+                    e
+                );
+                IggyError::CannotWriteToFile
+            })?;
+
+            self.file_position += initial_len as u64;
+            return Ok(initial_len);
         }
 
-        if !data.is_empty() {
-            let whole_sectors_end = data.len() & !(ALIGNMENT - 1);
-            if whole_sectors_end > 0 {
-                let whole_sectors = &data[..whole_sectors_end];
-                let mut written = 0;
-
-                while written < whole_sectors.len() {
-                    let chunk_size = (whole_sectors.len() - written).min(128 * 
1024 * 1024);
-                    let chunk = &whole_sectors[written..written + chunk_size];
-
-                    let chunk_buffer = PooledBuffer::from(chunk);
+        // Slower path: need to handle tail and/or unaligned data
+        let mut written = 0;
 
-                    let (result, _) = self
-                        .file
-                        .write_all_at(chunk_buffer, self.file_position)
-                        .await
-                        .into();
+        // Handle tail data first
+        if self.tail_len.0 > 0 {
+            let need = ALIGNMENT - self.tail_len.0;
+            let to_copy = need.min(buffer.len());
+            self.tail.extend_from_slice(&buffer[..to_copy]);
+            self.tail_len.0 += to_copy;
+            written += to_copy;
 
-                    result.map_err(|e| {
-                        tracing::error!("Failed to write to direct file: {} at 
position {}, chunk size: {}, error: {}",
-                            self.file_path, self.file_position, chunk_size, e);
-                        IggyError::CannotWriteToFile
-                    })?;
+            if self.tail_len.0 == ALIGNMENT {
+                self.flush_tail().await?;
+            }
 
-                    self.file_position += chunk_size as u64;
-                    written += chunk_size;
-                }
+            // If we consumed the entire buffer, we're done
+            if written >= buffer.len() {
+                return Ok(initial_len);
+            }
+        }
 
-                data = &data[whole_sectors_end..];
+        // Calculate aligned portion
+        let remaining = buffer.len() - written;
+        let aligned_len = remaining & !(ALIGNMENT - 1);
+
+        if aligned_len > 0 {
+            // Optimization: use scratch buffer for small writes to avoid 
allocation
+            if aligned_len <= self.scratch.0.len() {
+                // Copy to scratch buffer (hot in L1 cache)
+                self.scratch.0[..aligned_len]
+                    .copy_from_slice(&buffer[written..written + aligned_len]);
+                written += aligned_len;
+
+                // Create a slice wrapper for the scratch buffer
+                let scratch_slice = ScratchSlice {
+                    ptr: self.scratch.0.as_ptr(),
+                    len: aligned_len,
+                };
+
+                // Direct write with scratch buffer - no allocation!
+                let (result, _) = self
+                    .file
+                    .write_all_at(scratch_slice, self.file_position)
+                    .await
+                    .into();
+
+                result.map_err(|e| {
+                    tracing::error!(
+                        "Failed to write to direct file: {} at position {}, 
buffer_len: {}, error: {}",
+                        self.file_path,
+                        self.file_position,
+                        aligned_len,
+                        e
+                    );
+                    IggyError::CannotWriteToFile
+                })?;
+            } else {
+                // For larger writes, allocate a new buffer
+                let mut aligned_buffer = 
PooledBuffer::with_capacity(aligned_len);
+                aligned_buffer.extend_from_slice(&buffer[written..written + 
aligned_len]);
+                written += aligned_len;
+
+                // Direct write with owned buffer
+                let (result, _) = self
+                    .file
+                    .write_all_at(aligned_buffer, self.file_position)
+                    .await
+                    .into();
+
+                result.map_err(|e| {
+                    tracing::error!(
+                        "Failed to write to direct file: {} at position {}, 
buffer_len: {}, error: {}",
+                        self.file_path,
+                        self.file_position,
+                        aligned_len,
+                        e
+                    );
+                    IggyError::CannotWriteToFile
+                })?;
             }
+
+            self.file_position += aligned_len as u64;
         }
 
-        if !data.is_empty() {
+        // Store any remainder in tail
+        let remainder = &buffer[written..];
+        if !remainder.is_empty() {
             self.tail.clear();
-            self.tail.extend_from_slice(data);
-            self.tail_len = data.len();
+            self.tail.extend_from_slice(remainder);
+            self.tail_len.0 = remainder.len();
         }
 
         Ok(initial_len)
     }
 
+    /// Convenience wrapper for callers that have `&[u8]` instead of owned 
buffers
+    ///
+    /// This method allocates a new PooledBuffer and copies the data once.
+    /// For hot paths, prefer using `write_all` directly with owned 
PooledBuffers.
+    ///
+    /// # Performance Note
+    /// This incurs one memcpy to create the PooledBuffer. The copy happens in
+    /// userspace where caches are hot, then the buffer is passed zero-copy to 
kernel.
+    pub async fn write_from_slice(&mut self, data: &[u8]) -> Result<usize, 
IggyError> {
+        let mut buffer = PooledBuffer::with_capacity(data.len());
+        buffer.extend_from_slice(data);
+        self.write_all(buffer).await
+    }
+
     pub async fn flush(&mut self) -> Result<(), IggyError> {
-        if self.tail_len > 0 {
+        if self.tail_len.0 > 0 {
             self.tail.resize(ALIGNMENT, 0);
             self.flush_tail().await?;
         }
@@ -181,27 +325,186 @@ impl DirectFile {
     }
 
     pub fn tail_len(&self) -> usize {
-        self.tail_len
+        self.tail_len.0
     }
 
     pub fn file_path(&self) -> &str {
         &self.file_path
     }
 
+    /// Write multiple buffers using vectored I/O with ownership transfer
+    ///
+    /// This method takes ownership of all buffers to satisfy the 'static 
lifetime
+    /// requirement. Each buffer is consumed and passed to the kernel for DMA.
+    ///
+    /// # Memory Model
+    /// - Takes ownership of Vec<PooledBuffer> - all buffers are consumed
+    /// - Aligns data at buffer boundaries with minimal copying (max ALIGNMENT 
bytes)
+    /// - Uses spare buffer to avoid allocations for boundary alignment
+    ///
+    /// # Performance
+    /// - Single syscall for multiple buffers (vectored I/O)
+    /// - Only copies at buffer boundaries for alignment (worst case: 
ALIGNMENT bytes per buffer)
+    /// - Reuses spare buffer across calls to avoid allocations
+    pub async fn write_vectored(&mut self, buffers: Vec<PooledBuffer>) -> 
Result<usize, IggyError> {
+        if buffers.is_empty() {
+            return Ok(0);
+        }
+
+        let mut total_logical_size = 0usize;
+        let mut write_buffers = Vec::with_capacity(buffers.len() * 2); // 
Worst case: each buffer splits
+
+        // Initialize carry-over tracking
+        let mut carry_over_len = 0usize;
+
+        // If we have existing tail data, move it to spare buffer
+        if self.tail_len.0 > 0 {
+            tracing::trace!(
+                "write_vectored: moving {} tail bytes to spare buffer",
+                self.tail_len.0
+            );
+            self.spare.clear();
+            self.spare.extend_from_slice(&self.tail[..self.tail_len.0]);
+            carry_over_len = self.tail_len.0;
+            self.tail_len.0 = 0;
+        }
+
+        for buffer in buffers {
+            let buffer_len = buffer.len();
+            total_logical_size += buffer_len;
+
+            if carry_over_len > 0 {
+                // We have data from previous buffer that needs to be combined
+                let need = ALIGNMENT - carry_over_len;
+
+                if buffer_len >= need {
+                    // Complete the alignment by copying just what we need
+                    self.spare.extend_from_slice(&buffer[..need]);
+
+                    // Create a new buffer from spare and push it
+                    write_buffers.push(std::mem::replace(
+                        &mut self.spare,
+                        PooledBuffer::with_capacity(ALIGNMENT),
+                    ));
+
+                    // Clear spare for next use
+                    self.spare.clear();
+                    carry_over_len = 0;
+
+                    // Process the rest of this buffer
+                    let remaining = buffer_len - need;
+                    let aligned_size = remaining & !(ALIGNMENT - 1);
+
+                    if aligned_size > 0 {
+                        // Check if we can use the whole remaining buffer
+                        if aligned_size == remaining {
+                            // We can use the slice without remainder
+                            if need == 0 {
+                                // No slicing needed, use the whole buffer
+                                write_buffers.push(buffer);
+                            } else {
+                                // Need to slice from the beginning
+                                let mut aligned_buffer = 
PooledBuffer::with_capacity(remaining);
+                                
aligned_buffer.extend_from_slice(&buffer[need..]);
+                                write_buffers.push(aligned_buffer);
+                            }
+                        } else {
+                            // We need to split: save remainder to spare
+                            self.spare.extend_from_slice(&buffer[need + 
aligned_size..]);
+                            carry_over_len = remaining - aligned_size;
+
+                            // Create a new buffer with just the aligned 
portion
+                            let mut aligned_buffer = 
PooledBuffer::with_capacity(aligned_size);
+                            
aligned_buffer.extend_from_slice(&buffer[need..need + aligned_size]);
+                            write_buffers.push(aligned_buffer);
+                        }
+                    } else {
+                        // All remaining data goes to spare
+                        self.spare.extend_from_slice(&buffer[need..]);
+                        carry_over_len = remaining;
+                    }
+                } else {
+                    // This buffer is too small to complete alignment
+                    self.spare.extend_from_slice(&buffer);
+                    carry_over_len += buffer_len;
+                }
+            } else {
+                // No carry over, process this buffer directly
+                let aligned_size = buffer_len & !(ALIGNMENT - 1);
+
+                if aligned_size > 0 {
+                    if aligned_size == buffer_len {
+                        // Use the whole buffer
+                        write_buffers.push(buffer);
+                    } else {
+                        // Split the buffer: save remainder to spare
+                        self.spare.clear();
+                        self.spare.extend_from_slice(&buffer[aligned_size..]);
+                        carry_over_len = buffer_len - aligned_size;
+
+                        // Create aligned buffer
+                        let mut aligned_buffer = 
PooledBuffer::with_capacity(aligned_size);
+                        
aligned_buffer.extend_from_slice(&buffer[..aligned_size]);
+                        write_buffers.push(aligned_buffer);
+                    }
+                } else {
+                    // Entire buffer goes to spare
+                    self.spare.clear();
+                    self.spare.extend_from_slice(&buffer);
+                    carry_over_len = buffer_len;
+                }
+            }
+        }
+
+        // Perform the vectored write if we have aligned buffers
+        if !write_buffers.is_empty() {
+            let bytes_written: usize = write_buffers.iter().map(|b| 
b.len()).sum();
+
+            tracing::trace!(
+                "write_vectored: writing {} buffers totaling {} bytes at 
position {}",
+                write_buffers.len(),
+                bytes_written,
+                self.file_position
+            );
+
+            let (result, _) = self
+                .file
+                .write_vectored_all_at(write_buffers, self.file_position)
+                .await
+                .into();
+
+            result.map_err(|e| {
+                tracing::error!("Failed to write vectored data to {}: {}", 
self.file_path, e);
+                IggyError::CannotWriteToFile
+            })?;
+
+            self.file_position += bytes_written as u64;
+        }
+
+        // Store any remainder from spare to tail buffer
+        if carry_over_len > 0 {
+            self.tail.clear();
+            self.tail.extend_from_slice(&self.spare[..carry_over_len]);
+            self.tail_len.0 = carry_over_len;
+        }
+
+        Ok(total_logical_size)
+    }
+
     pub fn tail_buffer(&self) -> &PooledBuffer {
         &self.tail
     }
 
     pub fn take_tail(&mut self) -> (PooledBuffer, usize) {
         let tail = std::mem::replace(&mut self.tail, 
PooledBuffer::with_capacity(ALIGNMENT));
-        let tail_len = self.tail_len;
-        self.tail_len = 0;
+        let tail_len = self.tail_len.0;
+        self.tail_len.0 = 0;
         (tail, tail_len)
     }
 
     pub fn set_tail(&mut self, tail: PooledBuffer, tail_len: usize) {
         self.tail = tail;
-        self.tail_len = tail_len;
+        self.tail_len.0 = tail_len;
     }
 
     async fn flush_tail(&mut self) -> Result<(), IggyError> {
@@ -227,7 +530,7 @@ impl DirectFile {
         })?;
 
         self.file_position += ALIGNMENT as u64;
-        self.tail_len = 0;
+        self.tail_len.0 = 0;
         self.tail = returned_buf;
         self.tail.clear();
         Ok(())
@@ -257,7 +560,10 @@ mod tests {
 
             for i in 0..10u64 {
                 let buf = i.to_le_bytes();
-                direct_file.write_all(&buf).await.unwrap();
+                direct_file
+                    .write_all(PooledBuffer::from(&buf[..]))
+                    .await
+                    .unwrap();
             }
 
             direct_file.flush().await.unwrap();
@@ -294,7 +600,10 @@ mod tests {
                 .unwrap();
 
             let data = vec![42u8; ALIGNMENT];
-            direct_file.write_all(&data).await.unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data.as_slice()))
+                .await
+                .unwrap();
 
             assert_eq!(direct_file.tail_len(), 0);
             assert_eq!(direct_file.position(), ALIGNMENT as u64);
@@ -330,9 +639,18 @@ mod tests {
             let data2 = vec![2u8; ALIGNMENT * 3];
             let data3 = vec![3u8; ALIGNMENT];
 
-            direct_file.write_all(&data1).await.unwrap();
-            direct_file.write_all(&data2).await.unwrap();
-            direct_file.write_all(&data3).await.unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data1.as_slice()))
+                .await
+                .unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data2.as_slice()))
+                .await
+                .unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data3.as_slice()))
+                .await
+                .unwrap();
 
             let file = OpenOptions::new()
                 .read(true)
@@ -370,7 +688,10 @@ mod tests {
                 .unwrap();
 
             let data = vec![77u8; 1000];
-            direct_file.write_all(&data).await.unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data.as_slice()))
+                .await
+                .unwrap();
 
             assert_eq!(direct_file.tail_len(), 1000 % ALIGNMENT);
             assert_eq!(direct_file.position(), ALIGNMENT as u64);
@@ -415,9 +736,18 @@ mod tests {
             let data2 = vec![2u8; 200];
             let data3 = vec![3u8; 100];
 
-            direct_file.write_all(&data1).await.unwrap();
-            direct_file.write_all(&data2).await.unwrap();
-            direct_file.write_all(&data3).await.unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data1.as_slice()))
+                .await
+                .unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data2.as_slice()))
+                .await
+                .unwrap();
+            direct_file
+                .write_all(PooledBuffer::from(data3.as_slice()))
+                .await
+                .unwrap();
 
             assert_eq!(direct_file.tail_len(), 700 % ALIGNMENT);
 
@@ -440,4 +770,233 @@ mod tests {
             assert_eq!(&read_buffer[600..700], &vec![3u8; 100]);
         });
     }
+
+    #[test]
+    fn test_direct_file_vectored_write() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_vectored.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            // Create multiple unaligned buffers
+            let buffers = vec![
+                PooledBuffer::from(vec![1u8; 300].as_slice()),
+                PooledBuffer::from(vec![2u8; 400].as_slice()),
+                PooledBuffer::from(vec![3u8; 324].as_slice()),
+            ];
+
+            let written = direct_file.write_vectored(buffers).await.unwrap();
+            assert_eq!(written, 1024);
+
+            // Should have data in tail
+            assert_eq!(direct_file.tail_len(), 1024 % ALIGNMENT);
+
+            direct_file.flush().await.unwrap();
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT * 2];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(&read_buffer[0..300], &vec![1u8; 300]);
+            assert_eq!(&read_buffer[300..700], &vec![2u8; 400]);
+            assert_eq!(&read_buffer[700..1024], &vec![3u8; 324]);
+        });
+    }
+
+    #[test]
+    fn test_direct_file_vectored_aligned_buffers() {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir.path().join("test_vectored_aligned.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            // Create multiple aligned buffers
+            let buffers = vec![
+                PooledBuffer::from(vec![1u8; ALIGNMENT].as_slice()),
+                PooledBuffer::from(vec![2u8; ALIGNMENT * 2].as_slice()),
+                PooledBuffer::from(vec![3u8; ALIGNMENT].as_slice()),
+            ];
+
+            let written = direct_file.write_vectored(buffers).await.unwrap();
+            assert_eq!(written, ALIGNMENT * 4);
+
+            // Should have no tail
+            assert_eq!(direct_file.tail_len(), 0);
+            assert_eq!(direct_file.position(), (ALIGNMENT * 4) as u64);
+
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; ALIGNMENT * 4];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            assert_eq!(&read_buffer[0..ALIGNMENT], &vec![1u8; ALIGNMENT]);
+            assert_eq!(
+                &read_buffer[ALIGNMENT..ALIGNMENT * 3],
+                &vec![2u8; ALIGNMENT * 2]
+            );
+            assert_eq!(
+                &read_buffer[ALIGNMENT * 3..ALIGNMENT * 4],
+                &vec![3u8; ALIGNMENT]
+            );
+        });
+    }
+
+    #[test]
+    fn test_direct_file_alignment_minus_one_plus_double_alignment_plus_seven() 
{
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = 
temp_dir.path().join("test_alignment_edge_case.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            // First write: ALIGNMENT - 1 bytes
+            let data1 = vec![0xAA; ALIGNMENT - 1];
+            direct_file
+                .write_all(PooledBuffer::from(data1.as_slice()))
+                .await
+                .unwrap();
+
+            // Should have ALIGNMENT - 1 bytes in tail
+            assert_eq!(direct_file.tail_len(), ALIGNMENT - 1);
+            assert_eq!(direct_file.position(), 0);
+
+            // Second write: 2 * ALIGNMENT + 7 bytes
+            let data2 = vec![0xBB; 2 * ALIGNMENT + 7];
+            direct_file
+                .write_all(PooledBuffer::from(data2.as_slice()))
+                .await
+                .unwrap();
+
+            // After this write:
+            // - First byte from data2 completes the first sector (tail was 
ALIGNMENT-1, needed 1 more)
+            // - Next 2*ALIGNMENT bytes fill exactly 2 sectors
+            // - Last 6 bytes go to tail (2*ALIGNMENT + 7 - 1 - 2*ALIGNMENT = 
6)
+            assert_eq!(direct_file.tail_len(), 6);
+            assert_eq!(direct_file.position(), 3 * ALIGNMENT as u64);
+
+            // Flush to write the tail
+            direct_file.flush().await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 0);
+            assert_eq!(direct_file.position(), 4 * ALIGNMENT as u64);
+
+            // Verify the written data
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; 4 * ALIGNMENT];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            // First ALIGNMENT - 1 bytes should be 0xAA
+            assert_eq!(&read_buffer[0..ALIGNMENT - 1], &vec![0xAA; ALIGNMENT - 
1]);
+
+            // Next 2 * ALIGNMENT + 7 bytes should be 0xBB
+            assert_eq!(
+                &read_buffer[ALIGNMENT - 1..(ALIGNMENT - 1) + (2 * ALIGNMENT + 
7)],
+                &vec![0xBB; 2 * ALIGNMENT + 7]
+            );
+
+            // Remaining bytes should be 0 (padding)
+            let total_written = (ALIGNMENT - 1) + (2 * ALIGNMENT + 7);
+            assert_eq!(
+                &read_buffer[total_written..4 * ALIGNMENT],
+                &vec![0u8; 4 * ALIGNMENT - total_written]
+            );
+        });
+    }
+
+    #[test]
+    fn 
test_direct_file_vectored_alignment_minus_one_plus_double_alignment_plus_seven()
 {
+        compio::runtime::Runtime::new().unwrap().block_on(async {
+            MemoryPool::init_pool(Arc::new(SystemConfig::default()));
+            let temp_dir = tempdir().unwrap();
+            let file_path = temp_dir
+                .path()
+                .join("test_vectored_alignment_edge_case.bin");
+
+            let mut direct_file = 
DirectFile::open(file_path.to_str().unwrap(), 0, false)
+                .await
+                .unwrap();
+
+            // Create buffers for vectored write
+            let buffers = vec![
+                PooledBuffer::from(vec![0xAA; ALIGNMENT - 1].as_slice()),
+                PooledBuffer::from(vec![0xBB; 2 * ALIGNMENT + 7].as_slice()),
+            ];
+
+            let written = direct_file.write_vectored(buffers).await.unwrap();
+            assert_eq!(written, (ALIGNMENT - 1) + (2 * ALIGNMENT + 7));
+
+            // After vectored write, should have 6 bytes in tail
+            assert_eq!(direct_file.tail_len(), 6);
+            assert_eq!(direct_file.position(), 3 * ALIGNMENT as u64);
+
+            // Flush to write the tail
+            direct_file.flush().await.unwrap();
+
+            assert_eq!(direct_file.tail_len(), 0);
+            assert_eq!(direct_file.position(), 4 * ALIGNMENT as u64);
+
+            // Verify the written data
+            let file = OpenOptions::new()
+                .read(true)
+                .custom_flags(0x4000)
+                .open(&file_path)
+                .await
+                .unwrap();
+
+            let mut read_buffer = vec![0u8; 4 * ALIGNMENT];
+            let (result, buf) = file.read_at(read_buffer, 0).await.into();
+            result.unwrap();
+            read_buffer = buf;
+
+            // First ALIGNMENT - 1 bytes should be 0xAA
+            assert_eq!(&read_buffer[0..ALIGNMENT - 1], &vec![0xAA; ALIGNMENT - 
1]);
+
+            // Next 2 * ALIGNMENT + 7 bytes should be 0xBB
+            assert_eq!(
+                &read_buffer[ALIGNMENT - 1..(ALIGNMENT - 1) + (2 * ALIGNMENT + 
7)],
+                &vec![0xBB; 2 * ALIGNMENT + 7]
+            );
+
+            // Remaining bytes should be 0 (padding)
+            let total_written = (ALIGNMENT - 1) + (2 * ALIGNMENT + 7);
+            assert_eq!(
+                &read_buffer[total_written..4 * ALIGNMENT],
+                &vec![0u8; 4 * ALIGNMENT - total_written]
+            );
+        });
+    }
 }
diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs 
b/core/server/src/streaming/segments/indexes/index_writer.rs
index 782e633e1..6d9aba503 100644
--- a/core/server/src/streaming/segments/indexes/index_writer.rs
+++ b/core/server/src/streaming/segments/indexes/index_writer.rs
@@ -92,7 +92,7 @@ impl IndexWriter {
 
         let bytes_written = self
             .direct_file
-            .write_all(&indexes)
+            .write_all(indexes)
             .await
             .with_error_context(|error| {
                 format!(
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index 537ecb78c..41fd0312b 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -31,12 +31,19 @@ async fn write_batch_with_direct_file(
     mut batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
     let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
-    let mut messages_count = 0;
+    let messages_count: u32 = batches.iter().map(|b| b.count()).sum();
 
-    for batch in batches.iter_mut() {
-        messages_count += batch.count();
-        let messages = batch.take_messages();
-        direct_file.write_all(&messages).await?;
+    if batches.containers_count() == 1 {
+        // Single batch - use write_all with owned buffer
+        let messages = batches.iter_mut().next().unwrap().take_messages();
+        direct_file.write_all(messages).await?;
+    } else {
+        // Multiple batches - use vectored write for better performance
+        let mut buffers = Vec::with_capacity(batches.containers_count());
+        for batch in batches.iter_mut() {
+            buffers.push(batch.take_messages());
+        }
+        direct_file.write_vectored(buffers).await?;
     }
 
     tracing::trace!("Saved {} messages", messages_count);
diff --git a/core/server/src/streaming/utils/pooled_buffer.rs 
b/core/server/src/streaming/utils/pooled_buffer.rs
index b3769bf27..03d1dce2e 100644
--- a/core/server/src/streaming/utils/pooled_buffer.rs
+++ b/core/server/src/streaming/utils/pooled_buffer.rs
@@ -214,16 +214,6 @@ impl PooledBuffer {
     pub fn put<T: AsRef<[u8]>>(&mut self, data: T) {
         self.extend_from_slice(data.as_ref());
     }
-
-    /// Align the buffer length to the next 512-byte boundary by padding with 
zeros
-    pub fn align(&mut self) {
-        let current_len = self.inner.len();
-        let aligned_len = (current_len + 511) & !511;
-        if aligned_len > current_len {
-            let padding = aligned_len - current_len;
-            self.resize(aligned_len, 0);
-        }
-    }
 }
 
 impl Deref for PooledBuffer {


Reply via email to