This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_direct_io_socket_transfer in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 57afcb4a982fe2cc0c79bd5e65581bb7bbcb2da5 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Jul 8 00:38:06 2025 +0200 improvements --- core/server/src/streaming/segments/direct_file.rs | 679 +++++++++++++++++++-- .../src/streaming/segments/indexes/index_writer.rs | 2 +- core/server/src/streaming/segments/messages/mod.rs | 17 +- core/server/src/streaming/utils/pooled_buffer.rs | 10 - 4 files changed, 632 insertions(+), 76 deletions(-) diff --git a/core/server/src/streaming/segments/direct_file.rs b/core/server/src/streaming/segments/direct_file.rs index ce1ecb788..7de7c4b33 100644 --- a/core/server/src/streaming/segments/direct_file.rs +++ b/core/server/src/streaming/segments/direct_file.rs @@ -17,18 +17,63 @@ */ use crate::streaming::utils::{ALIGNMENT, PooledBuffer}; +use compio::buf::{IntoInner, IoBuf}; use compio::fs::{File, OpenOptions}; use compio::io::AsyncWriteAtExt; use error_set::ErrContext; use iggy_common::IggyError; +const O_DIRECT: i32 = 0x4000; +const O_DSYNC: i32 = 0x1000; + +/// Cache line padding to prevent false sharing +/// Most modern CPUs have 64-byte cache lines +#[repr(align(64))] +#[derive(Debug)] +struct Padded<T>(T); + +/// Stack-allocated scratch buffer for small writes +/// Aligned to 4KiB for optimal performance +#[repr(align(512))] +#[derive(Debug)] +struct ScratchBuffer([u8; 4096]); + +/// A wrapper that allows us to pass a slice of ScratchBuffer as IoBuf +/// This is safe because DirectFile owns the ScratchBuffer for its entire lifetime +struct ScratchSlice { + ptr: *const u8, + len: usize, +} + +// SAFETY: ScratchSlice is only created from DirectFile's scratch buffer, +// which lives for the entire lifetime of DirectFile. The async I/O operation +// completes before any method returns, ensuring the buffer remains valid. +unsafe impl Send for ScratchSlice {} +unsafe impl Sync for ScratchSlice {} + +unsafe impl IoBuf for ScratchSlice { + fn as_buf_ptr(&self) -> *const u8 { + self.ptr + } + + fn buf_len(&self) -> usize { + self.len + } + + fn buf_capacity(&self) -> usize { + self.len + } +} + #[derive(Debug)] pub struct DirectFile { file_path: String, file: File, file_position: u64, tail: PooledBuffer, - tail_len: usize, + tail_len: Padded<usize>, // Padded to avoid false sharing on hot path + spare: PooledBuffer, // Reusable buffer for carry-over in write_vectored + scratch: ScratchBuffer, // Stack-allocated scratch buffer for small writes } impl DirectFile { @@ -40,7 +85,7 @@ impl DirectFile { let mut file = OpenOptions::new() .create(true) .write(true) - .custom_flags(0x4000) + .custom_flags(O_DIRECT | O_DSYNC) .open(file_path) .await .with_error_context(|err| { @@ -77,7 +122,9 @@ impl DirectFile { file, file_position: initial_position, tail: PooledBuffer::with_capacity(ALIGNMENT), - tail_len: 0, + tail_len: Padded(0), + spare: PooledBuffer::with_capacity(ALIGNMENT), + scratch: ScratchBuffer([0u8; 4096]), }) } @@ -101,75 +148,172 @@ impl DirectFile { file, file_position: initial_position, tail: PooledBuffer::with_capacity(ALIGNMENT), - tail_len: 0, + tail_len: Padded(0), + spare: PooledBuffer::with_capacity(ALIGNMENT), + scratch: ScratchBuffer([0u8; 4096]), } } - pub async fn write_all(&mut self, mut data: &[u8]) -> Result<usize, IggyError> { - let initial_len = data.len(); + /// Write data from an owned PooledBuffer with zero-copy optimization + /// + /// This method takes ownership of the buffer to satisfy the 'static lifetime + /// requirement for async I/O operations. The buffer ownership is passed directly + /// to the kernel for DMA operations, ensuring sound memory management without + /// requiring unsafe code or risking use-after-free bugs. + /// + /// # Memory Model + /// - Takes ownership of `PooledBuffer` (already aligned and pooled) + /// - Passes buffer directly to kernel via compio (zero-copy for aligned data) + /// - Returns buffer to pool automatically when dropped + /// + /// # Performance + /// - Zero-copy for aligned portions (direct DMA transfer) + /// - Single memcpy only for tail data (< 512 bytes per call) + pub async fn write_all(&mut self, buffer: PooledBuffer) -> Result<usize, IggyError> { + let initial_len = buffer.len(); tracing::trace!( "DirectFile write_all called for file: {}, data_len: {}, position: {}, tail_len: {}", self.file_path, initial_len, self.file_position, - self.tail_len + self.tail_len.0 ); - if self.tail_len > 0 { - let need = ALIGNMENT - self.tail_len; - let take = need.min(data.len()); - self.tail.extend_from_slice(&data[..take]); - self.tail_len += take; - data = &data[take..]; - - if self.tail_len == ALIGNMENT { - self.flush_tail().await?; - } + // Fast path: no tail data and buffer is perfectly aligned + if self.tail_len.0 == 0 && buffer.len() % ALIGNMENT == 0 { + // Direct write with owned buffer - zero-copy all the way! + let (result, _) = self + .file + .write_all_at(buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!( + "Failed to write to direct file: {} at position {}, buffer_len: {}, error: {}", + self.file_path, + self.file_position, + initial_len, + e + ); + IggyError::CannotWriteToFile + })?; + + self.file_position += initial_len as u64; + return Ok(initial_len); } - if !data.is_empty() { - let whole_sectors_end = data.len() & !(ALIGNMENT - 1); - if whole_sectors_end > 0 { - let whole_sectors = &data[..whole_sectors_end]; - let mut written = 0; - - while written < whole_sectors.len() { - let chunk_size = (whole_sectors.len() - written).min(128 * 1024 * 1024); - let chunk = &whole_sectors[written..written + chunk_size]; - - let chunk_buffer = PooledBuffer::from(chunk); + // Slower path: need to handle tail and/or unaligned data + let mut written = 0; - let (result, _) = self - .file - .write_all_at(chunk_buffer, self.file_position) - .await - .into(); + // Handle tail data first + if self.tail_len.0 > 0 { + let need = ALIGNMENT - self.tail_len.0; + let to_copy = need.min(buffer.len()); + self.tail.extend_from_slice(&buffer[..to_copy]); + self.tail_len.0 += to_copy; + written += to_copy; - result.map_err(|e| { - tracing::error!("Failed to write to direct file: {} at position {}, chunk size: {}, error: {}", - self.file_path, self.file_position, chunk_size, e); - IggyError::CannotWriteToFile - })?; + if self.tail_len.0 == ALIGNMENT { + self.flush_tail().await?; + } - self.file_position += chunk_size as u64; - written += chunk_size; - } + // If we consumed the entire buffer, we're done + if written >= buffer.len() { + return Ok(initial_len); + } + } - data = &data[whole_sectors_end..]; + // Calculate aligned portion + let remaining = buffer.len() - written; + let aligned_len = remaining & !(ALIGNMENT - 1); + + if aligned_len > 0 { + // Optimization: use scratch buffer for small writes to avoid allocation + if aligned_len <= self.scratch.0.len() { + // Copy to scratch buffer (hot in L1 cache) + self.scratch.0[..aligned_len] + .copy_from_slice(&buffer[written..written + aligned_len]); + written += aligned_len; + + // Create a slice wrapper for the scratch buffer + let scratch_slice = ScratchSlice { + ptr: self.scratch.0.as_ptr(), + len: aligned_len, + }; + + // Direct write with scratch buffer - no allocation! + let (result, _) = self + .file + .write_all_at(scratch_slice, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!( + "Failed to write to direct file: {} at position {}, buffer_len: {}, error: {}", + self.file_path, + self.file_position, + aligned_len, + e + ); + IggyError::CannotWriteToFile + })?; + } else { + // For larger writes, allocate a new buffer + let mut aligned_buffer = PooledBuffer::with_capacity(aligned_len); + aligned_buffer.extend_from_slice(&buffer[written..written + aligned_len]); + written += aligned_len; + + // Direct write with owned buffer + let (result, _) = self + .file + .write_all_at(aligned_buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!( + "Failed to write to direct file: {} at position {}, buffer_len: {}, error: {}", + self.file_path, + self.file_position, + aligned_len, + e + ); + IggyError::CannotWriteToFile + })?; } + + self.file_position += aligned_len as u64; } - if !data.is_empty() { + // Store any remainder in tail + let remainder = &buffer[written..]; + if !remainder.is_empty() { self.tail.clear(); - self.tail.extend_from_slice(data); - self.tail_len = data.len(); + self.tail.extend_from_slice(remainder); + self.tail_len.0 = remainder.len(); } Ok(initial_len) } + /// Convenience wrapper for callers that have `&[u8]` instead of owned buffers + /// + /// This method allocates a new PooledBuffer and copies the data once. + /// For hot paths, prefer using `write_all` directly with owned PooledBuffers. + /// + /// # Performance Note + /// This incurs one memcpy to create the PooledBuffer. The copy happens in + /// userspace where caches are hot, then the buffer is passed zero-copy to kernel. + pub async fn write_from_slice(&mut self, data: &[u8]) -> Result<usize, IggyError> { + let mut buffer = PooledBuffer::with_capacity(data.len()); + buffer.extend_from_slice(data); + self.write_all(buffer).await + } + pub async fn flush(&mut self) -> Result<(), IggyError> { - if self.tail_len > 0 { + if self.tail_len.0 > 0 { self.tail.resize(ALIGNMENT, 0); self.flush_tail().await?; } @@ -181,27 +325,186 @@ impl DirectFile { } pub fn tail_len(&self) -> usize { - self.tail_len + self.tail_len.0 } pub fn file_path(&self) -> &str { &self.file_path } + /// Write multiple buffers using vectored I/O with ownership transfer + /// + /// This method takes ownership of all buffers to satisfy the 'static lifetime + /// requirement. Each buffer is consumed and passed to the kernel for DMA. + /// + /// # Memory Model + /// - Takes ownership of Vec<PooledBuffer> - all buffers are consumed + /// - Aligns data at buffer boundaries with minimal copying (max ALIGNMENT bytes) + /// - Uses spare buffer to avoid allocations for boundary alignment + /// + /// # Performance + /// - Single syscall for multiple buffers (vectored I/O) + /// - Only copies at buffer boundaries for alignment (worst case: ALIGNMENT bytes per buffer) + /// - Reuses spare buffer across calls to avoid allocations + pub async fn write_vectored(&mut self, buffers: Vec<PooledBuffer>) -> Result<usize, IggyError> { + if buffers.is_empty() { + return Ok(0); + } + + let mut total_logical_size = 0usize; + let mut write_buffers = Vec::with_capacity(buffers.len() * 2); // Worst case: each buffer splits + + // Initialize carry-over tracking + let mut carry_over_len = 0usize; + + // If we have existing tail data, move it to spare buffer + if self.tail_len.0 > 0 { + tracing::trace!( + "write_vectored: moving {} tail bytes to spare buffer", + self.tail_len.0 + ); + self.spare.clear(); + self.spare.extend_from_slice(&self.tail[..self.tail_len.0]); + carry_over_len = self.tail_len.0; + self.tail_len.0 = 0; + } + + for buffer in buffers { + let buffer_len = buffer.len(); + total_logical_size += buffer_len; + + if carry_over_len > 0 { + // We have data from previous buffer that needs to be combined + let need = ALIGNMENT - carry_over_len; + + if buffer_len >= need { + // Complete the alignment by copying just what we need + self.spare.extend_from_slice(&buffer[..need]); + + // Create a new buffer from spare and push it + write_buffers.push(std::mem::replace( + &mut self.spare, + PooledBuffer::with_capacity(ALIGNMENT), + )); + + // Clear spare for next use + self.spare.clear(); + carry_over_len = 0; + + // Process the rest of this buffer + let remaining = buffer_len - need; + let aligned_size = remaining & !(ALIGNMENT - 1); + + if aligned_size > 0 { + // Check if we can use the whole remaining buffer + if aligned_size == remaining { + // We can use the slice without remainder + if need == 0 { + // No slicing needed, use the whole buffer + write_buffers.push(buffer); + } else { + // Need to slice from the beginning + let mut aligned_buffer = PooledBuffer::with_capacity(remaining); + aligned_buffer.extend_from_slice(&buffer[need..]); + write_buffers.push(aligned_buffer); + } + } else { + // We need to split: save remainder to spare + self.spare.extend_from_slice(&buffer[need + aligned_size..]); + carry_over_len = remaining - aligned_size; + + // Create a new buffer with just the aligned portion + let mut aligned_buffer = PooledBuffer::with_capacity(aligned_size); + aligned_buffer.extend_from_slice(&buffer[need..need + aligned_size]); + write_buffers.push(aligned_buffer); + } + } else { + // All remaining data goes to spare + self.spare.extend_from_slice(&buffer[need..]); + carry_over_len = remaining; + } + } else { + // This buffer is too small to complete alignment + self.spare.extend_from_slice(&buffer); + carry_over_len += buffer_len; + } + } else { + // No carry over, process this buffer directly + let aligned_size = buffer_len & !(ALIGNMENT - 1); + + if aligned_size > 0 { + if aligned_size == buffer_len { + // Use the whole buffer + write_buffers.push(buffer); + } else { + // Split the buffer: save remainder to spare + self.spare.clear(); + self.spare.extend_from_slice(&buffer[aligned_size..]); + carry_over_len = buffer_len - aligned_size; + + // Create aligned buffer + let mut aligned_buffer = PooledBuffer::with_capacity(aligned_size); + aligned_buffer.extend_from_slice(&buffer[..aligned_size]); + write_buffers.push(aligned_buffer); + } + } else { + // Entire buffer goes to spare + self.spare.clear(); + self.spare.extend_from_slice(&buffer); + carry_over_len = buffer_len; + } + } + } + + // Perform the vectored write if we have aligned buffers + if !write_buffers.is_empty() { + let bytes_written: usize = write_buffers.iter().map(|b| b.len()).sum(); + + tracing::trace!( + "write_vectored: writing {} buffers totaling {} bytes at position {}", + write_buffers.len(), + bytes_written, + self.file_position + ); + + let (result, _) = self + .file + .write_vectored_all_at(write_buffers, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!("Failed to write vectored data to {}: {}", self.file_path, e); + IggyError::CannotWriteToFile + })?; + + self.file_position += bytes_written as u64; + } + + // Store any remainder from spare to tail buffer + if carry_over_len > 0 { + self.tail.clear(); + self.tail.extend_from_slice(&self.spare[..carry_over_len]); + self.tail_len.0 = carry_over_len; + } + + Ok(total_logical_size) + } + pub fn tail_buffer(&self) -> &PooledBuffer { &self.tail } pub fn take_tail(&mut self) -> (PooledBuffer, usize) { let tail = std::mem::replace(&mut self.tail, PooledBuffer::with_capacity(ALIGNMENT)); - let tail_len = self.tail_len; - self.tail_len = 0; + let tail_len = self.tail_len.0; + self.tail_len.0 = 0; (tail, tail_len) } pub fn set_tail(&mut self, tail: PooledBuffer, tail_len: usize) { self.tail = tail; - self.tail_len = tail_len; + self.tail_len.0 = tail_len; } async fn flush_tail(&mut self) -> Result<(), IggyError> { @@ -227,7 +530,7 @@ impl DirectFile { })?; self.file_position += ALIGNMENT as u64; - self.tail_len = 0; + self.tail_len.0 = 0; self.tail = returned_buf; self.tail.clear(); Ok(()) @@ -257,7 +560,10 @@ mod tests { for i in 0..10u64 { let buf = i.to_le_bytes(); - direct_file.write_all(&buf).await.unwrap(); + direct_file + .write_all(PooledBuffer::from(&buf[..])) + .await + .unwrap(); } direct_file.flush().await.unwrap(); @@ -294,7 +600,10 @@ mod tests { .unwrap(); let data = vec![42u8; ALIGNMENT]; - direct_file.write_all(&data).await.unwrap(); + direct_file + .write_all(PooledBuffer::from(data.as_slice())) + .await + .unwrap(); assert_eq!(direct_file.tail_len(), 0); assert_eq!(direct_file.position(), ALIGNMENT as u64); @@ -330,9 +639,18 @@ mod tests { let data2 = vec![2u8; ALIGNMENT * 3]; let data3 = vec![3u8; ALIGNMENT]; - direct_file.write_all(&data1).await.unwrap(); - direct_file.write_all(&data2).await.unwrap(); - direct_file.write_all(&data3).await.unwrap(); + direct_file + .write_all(PooledBuffer::from(data1.as_slice())) + .await + .unwrap(); + direct_file + .write_all(PooledBuffer::from(data2.as_slice())) + .await + .unwrap(); + direct_file + .write_all(PooledBuffer::from(data3.as_slice())) + .await + .unwrap(); let file = OpenOptions::new() .read(true) @@ -370,7 +688,10 @@ mod tests { .unwrap(); let data = vec![77u8; 1000]; - direct_file.write_all(&data).await.unwrap(); + direct_file + .write_all(PooledBuffer::from(data.as_slice())) + .await + .unwrap(); assert_eq!(direct_file.tail_len(), 1000 % ALIGNMENT); assert_eq!(direct_file.position(), ALIGNMENT as u64); @@ -415,9 +736,18 @@ mod tests { let data2 = vec![2u8; 200]; let data3 = vec![3u8; 100]; - direct_file.write_all(&data1).await.unwrap(); - direct_file.write_all(&data2).await.unwrap(); - direct_file.write_all(&data3).await.unwrap(); + direct_file + .write_all(PooledBuffer::from(data1.as_slice())) + .await + .unwrap(); + direct_file + .write_all(PooledBuffer::from(data2.as_slice())) + .await + .unwrap(); + direct_file + .write_all(PooledBuffer::from(data3.as_slice())) + .await + .unwrap(); assert_eq!(direct_file.tail_len(), 700 % ALIGNMENT); @@ -440,4 +770,233 @@ mod tests { assert_eq!(&read_buffer[600..700], &vec![3u8; 100]); }); } + + #[test] + fn test_direct_file_vectored_write() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_vectored.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + // Create multiple unaligned buffers + let buffers = vec![ + PooledBuffer::from(vec![1u8; 300].as_slice()), + PooledBuffer::from(vec![2u8; 400].as_slice()), + PooledBuffer::from(vec![3u8; 324].as_slice()), + ]; + + let written = direct_file.write_vectored(buffers).await.unwrap(); + assert_eq!(written, 1024); + + // Should have data in tail + assert_eq!(direct_file.tail_len(), 1024 % ALIGNMENT); + + direct_file.flush().await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 2]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..300], &vec![1u8; 300]); + assert_eq!(&read_buffer[300..700], &vec![2u8; 400]); + assert_eq!(&read_buffer[700..1024], &vec![3u8; 324]); + }); + } + + #[test] + fn test_direct_file_vectored_aligned_buffers() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_vectored_aligned.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + // Create multiple aligned buffers + let buffers = vec![ + PooledBuffer::from(vec![1u8; ALIGNMENT].as_slice()), + PooledBuffer::from(vec![2u8; ALIGNMENT * 2].as_slice()), + PooledBuffer::from(vec![3u8; ALIGNMENT].as_slice()), + ]; + + let written = direct_file.write_vectored(buffers).await.unwrap(); + assert_eq!(written, ALIGNMENT * 4); + + // Should have no tail + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), (ALIGNMENT * 4) as u64); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 4]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..ALIGNMENT], &vec![1u8; ALIGNMENT]); + assert_eq!( + &read_buffer[ALIGNMENT..ALIGNMENT * 3], + &vec![2u8; ALIGNMENT * 2] + ); + assert_eq!( + &read_buffer[ALIGNMENT * 3..ALIGNMENT * 4], + &vec![3u8; ALIGNMENT] + ); + }); + } + + #[test] + fn test_direct_file_alignment_minus_one_plus_double_alignment_plus_seven() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_alignment_edge_case.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + // First write: ALIGNMENT - 1 bytes + let data1 = vec![0xAA; ALIGNMENT - 1]; + direct_file + .write_all(PooledBuffer::from(data1.as_slice())) + .await + .unwrap(); + + // Should have ALIGNMENT - 1 bytes in tail + assert_eq!(direct_file.tail_len(), ALIGNMENT - 1); + assert_eq!(direct_file.position(), 0); + + // Second write: 2 * ALIGNMENT + 7 bytes + let data2 = vec![0xBB; 2 * ALIGNMENT + 7]; + direct_file + .write_all(PooledBuffer::from(data2.as_slice())) + .await + .unwrap(); + + // After this write: + // - First byte from data2 completes the first sector (tail was ALIGNMENT-1, needed 1 more) + // - Next 2*ALIGNMENT bytes fill exactly 2 sectors + // - Last 6 bytes go to tail (2*ALIGNMENT + 7 - 1 - 2*ALIGNMENT = 6) + assert_eq!(direct_file.tail_len(), 6); + assert_eq!(direct_file.position(), 3 * ALIGNMENT as u64); + + // Flush to write the tail + direct_file.flush().await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), 4 * ALIGNMENT as u64); + + // Verify the written data + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; 4 * ALIGNMENT]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + // First ALIGNMENT - 1 bytes should be 0xAA + assert_eq!(&read_buffer[0..ALIGNMENT - 1], &vec![0xAA; ALIGNMENT - 1]); + + // Next 2 * ALIGNMENT + 7 bytes should be 0xBB + assert_eq!( + &read_buffer[ALIGNMENT - 1..(ALIGNMENT - 1) + (2 * ALIGNMENT + 7)], + &vec![0xBB; 2 * ALIGNMENT + 7] + ); + + // Remaining bytes should be 0 (padding) + let total_written = (ALIGNMENT - 1) + (2 * ALIGNMENT + 7); + assert_eq!( + &read_buffer[total_written..4 * ALIGNMENT], + &vec![0u8; 4 * ALIGNMENT - total_written] + ); + }); + } + + #[test] + fn test_direct_file_vectored_alignment_minus_one_plus_double_alignment_plus_seven() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir + .path() + .join("test_vectored_alignment_edge_case.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + // Create buffers for vectored write + let buffers = vec![ + PooledBuffer::from(vec![0xAA; ALIGNMENT - 1].as_slice()), + PooledBuffer::from(vec![0xBB; 2 * ALIGNMENT + 7].as_slice()), + ]; + + let written = direct_file.write_vectored(buffers).await.unwrap(); + assert_eq!(written, (ALIGNMENT - 1) + (2 * ALIGNMENT + 7)); + + // After vectored write, should have 6 bytes in tail + assert_eq!(direct_file.tail_len(), 6); + assert_eq!(direct_file.position(), 3 * ALIGNMENT as u64); + + // Flush to write the tail + direct_file.flush().await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), 4 * ALIGNMENT as u64); + + // Verify the written data + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; 4 * ALIGNMENT]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + // First ALIGNMENT - 1 bytes should be 0xAA + assert_eq!(&read_buffer[0..ALIGNMENT - 1], &vec![0xAA; ALIGNMENT - 1]); + + // Next 2 * ALIGNMENT + 7 bytes should be 0xBB + assert_eq!( + &read_buffer[ALIGNMENT - 1..(ALIGNMENT - 1) + (2 * ALIGNMENT + 7)], + &vec![0xBB; 2 * ALIGNMENT + 7] + ); + + // Remaining bytes should be 0 (padding) + let total_written = (ALIGNMENT - 1) + (2 * ALIGNMENT + 7); + assert_eq!( + &read_buffer[total_written..4 * ALIGNMENT], + &vec![0u8; 4 * ALIGNMENT - total_written] + ); + }); + } } diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 782e633e1..6d9aba503 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -92,7 +92,7 @@ impl IndexWriter { let bytes_written = self .direct_file - .write_all(&indexes) + .write_all(indexes) .await .with_error_context(|error| { format!( diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index 537ecb78c..41fd0312b 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -31,12 +31,19 @@ async fn write_batch_with_direct_file( mut batches: IggyMessagesBatchSet, ) -> Result<usize, IggyError> { let total_written: usize = batches.iter().map(|b| b.size() as usize).sum(); - let mut messages_count = 0; + let messages_count: u32 = batches.iter().map(|b| b.count()).sum(); - for batch in batches.iter_mut() { - messages_count += batch.count(); - let messages = batch.take_messages(); - direct_file.write_all(&messages).await?; + if batches.containers_count() == 1 { + // Single batch - use write_all with owned buffer + let messages = batches.iter_mut().next().unwrap().take_messages(); + direct_file.write_all(messages).await?; + } else { + // Multiple batches - use vectored write for better performance + let mut buffers = Vec::with_capacity(batches.containers_count()); + for batch in batches.iter_mut() { + buffers.push(batch.take_messages()); + } + direct_file.write_vectored(buffers).await?; } tracing::trace!("Saved {} messages", messages_count); diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index b3769bf27..03d1dce2e 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -214,16 +214,6 @@ impl PooledBuffer { pub fn put<T: AsRef<[u8]>>(&mut self, data: T) { self.extend_from_slice(data.as_ref()); } - - /// Align the buffer length to the next 512-byte boundary by padding with zeros - pub fn align(&mut self) { - let current_len = self.inner.len(); - let aligned_len = (current_len + 511) & !511; - if aligned_len > current_len { - let padding = aligned_len - current_len; - self.resize(aligned_len, 0); - } - } } impl Deref for PooledBuffer {
