This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch io_uring_tpc_direct_io in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9583f4b520b99762b974d69ff1b8e8e21b5d0603 Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Jul 7 21:33:25 2025 +0200 feat(io_uring): implement directio and aligned PooledBuffer --- Cargo.lock | 30 ++ core/server/Cargo.toml | 1 + .../handlers/messages/send_messages_handler.rs | 42 +- core/server/src/binary/handlers/utils.rs | 3 +- core/server/src/binary/sender.rs | 7 +- core/server/src/main.rs | 11 + core/server/src/quic/quic_sender.rs | 2 +- core/server/src/shard/mod.rs | 28 +- core/server/src/shard/system/storage.rs | 2 +- core/server/src/streaming/segments/direct_file.rs | 443 +++++++++++++++++++++ .../src/streaming/segments/indexes/index_reader.rs | 23 +- .../src/streaming/segments/indexes/index_writer.rs | 101 ++--- .../streaming/segments/messages/messages_reader.rs | 22 +- .../streaming/segments/messages/messages_writer.rs | 113 ++---- core/server/src/streaming/segments/messages/mod.rs | 28 +- core/server/src/streaming/segments/mod.rs | 2 + .../src/streaming/segments/reading_messages.rs | 24 +- core/server/src/streaming/segments/segment.rs | 58 ++- .../streaming/segments/types/messages_batch_mut.rs | 15 +- .../src/streaming/segments/writing_messages.rs | 19 +- core/server/src/streaming/utils/memory_pool.rs | 64 +-- core/server/src/streaming/utils/mod.rs | 2 +- core/server/src/streaming/utils/pooled_buffer.rs | 152 ++++--- core/server/src/tcp/connection_handler.rs | 13 +- core/server/src/tcp/sender.rs | 18 +- core/server/src/tcp/tcp_sender.rs | 6 +- core/server/src/tcp/tcp_tls_sender.rs | 2 +- 27 files changed, 837 insertions(+), 394 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 540217c4..20d0a61b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -307,6 +307,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc890384c8602f339876ded803c97ad529f3842aba97f6392b3dba0dd171769b" +dependencies = [ + "equator", +] + [[package]] name = "alloc-no-stdlib" version = "2.0.4" @@ -2461,6 +2470,26 @@ dependencies = [ "log", ] +[[package]] +name = "equator" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4711b213838dfee0117e3be6ac926007d7f433d7bbe33595975d4190cb07e6fc" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44f23cf4b44bfce11a86ace86f8a73ffdec849c9fd00a386a53d278bd9e81fb3" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -7071,6 +7100,7 @@ name = "server" version = "0.5.0" dependencies = [ "ahash 0.8.12", + "aligned-vec", "anyhow", "async-channel", "async_zip", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 47738185..1ce3aa05 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -36,6 +36,7 @@ mimalloc = ["dep:mimalloc"] [dependencies] ahash = { workspace = true } +aligned-vec = "0.6" anyhow = { workspace = true } async_zip = { workspace = true } axum = { workspace = true } diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index bb536c61..6d155bca 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -57,18 +57,19 @@ impl ServerCommandHandler for SendMessages { let total_payload_size = length as usize - std::mem::size_of::<u32>(); let metadata_len_field_size = std::mem::size_of::<u32>(); - let metadata_length_buffer = PooledBuffer::with_capacity(4); - let (result, metadata_len_buf) = sender.read(metadata_length_buffer.slice(0..4)).await; - let metadata_len_buf = metadata_len_buf.into_inner(); - result?; - let metadata_size = u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap()); - - let metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); - let (result, metadata_buf) = sender + let mut metadata_length_buffer = PooledBuffer::with_capacity(4); + + let metadata_length_buffer = sender + .read(metadata_length_buffer.slice(0..4)) + .await? + .into_inner(); + let metadata_size = u32::from_le_bytes(metadata_length_buffer[0..4].try_into().unwrap()); + + let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); + let metadata_buf = sender .read(metadata_buffer.slice(0..metadata_size as usize)) - .await; - result?; - let metadata_buf = metadata_buf.into_inner(); + .await? + .into_inner(); let mut element_size = 0; @@ -91,17 +92,20 @@ impl ServerCommandHandler for SendMessages { ); let indexes_size = messages_count as usize * INDEX_SIZE; - let indexes_buffer = PooledBuffer::with_capacity(indexes_size); - let (result, indexes_buffer) = sender.read(indexes_buffer.slice(0..indexes_size)).await; - result?; - let indexes_buffer = indexes_buffer.into_inner(); + let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size + 512); // extra space for possible padding to not cause reallocations + let indexes_buffer = sender + .read(indexes_buffer.slice(0..indexes_size)) + .await? + .into_inner(); let messages_size = total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; - let messages_buffer = PooledBuffer::with_capacity(messages_size); - let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; - result?; - let messages_buffer = messages_buffer.into_inner(); + + let mut messages_buffer = PooledBuffer::with_capacity(messages_size + 512); // extra space for possible padding to not cause reallocations + let messages_buffer = sender + .read(messages_buffer.slice(0..messages_size)) + .await? + .into_inner(); let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages( diff --git a/core/server/src/binary/handlers/utils.rs b/core/server/src/binary/handlers/utils.rs index 80bcede4..54411d91 100644 --- a/core/server/src/binary/handlers/utils.rs +++ b/core/server/src/binary/handlers/utils.rs @@ -32,8 +32,7 @@ pub async fn receive_and_validate( let buffer = if length == 0 { buffer } else { - let (result, buffer) = sender.read(buffer).await; - result?; + let buffer = sender.read(buffer).await?; buffer }; diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index 96bed3d5..c6f1a811 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -56,10 +56,7 @@ macro_rules! forward_async_methods { } pub trait Sender { - fn read<B: IoBufMut>( - &mut self, - buffer: B, - ) -> impl Future<Output = (Result<usize, IggyError>, B)>; + fn read<B: IoBufMut>(&mut self, buffer: B) -> impl Future<Output = Result<B, IggyError>>; fn send_empty_ok_response(&mut self) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response(&mut self, payload: &[u8]) -> impl Future<Output = Result<(), IggyError>>; fn send_ok_response_vectored( @@ -98,7 +95,7 @@ impl SenderKind { } forward_async_methods! { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B); + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError>; async fn send_empty_ok_response(&mut self) -> Result<(), IggyError>; async fn send_ok_response(&mut self, payload: &[u8]) -> Result<(), IggyError>; async fn send_ok_response_vectored(&mut self, length: &[u8], slices: Vec<PooledBuffer>) -> Result<(), IggyError>; diff --git a/core/server/src/main.rs b/core/server/src/main.rs index e01f03ec..8029d216 100644 --- a/core/server/src/main.rs +++ b/core/server/src/main.rs @@ -57,6 +57,15 @@ use tracing::{error, info, instrument}; const COMPONENT: &str = "MAIN"; +fn thread_print_memory_pool_stats() { + std::thread::sleep(std::time::Duration::from_secs(5)); + let pool = server::streaming::utils::memory_pool(); + loop { + pool.log_stats(); + std::thread::sleep(std::time::Duration::from_secs(5)); + } +} + #[instrument(skip_all, name = "trace_start_server")] fn main() -> Result<(), ServerError> { let startup_timestamp = Instant::now(); @@ -286,6 +295,8 @@ fn main() -> Result<(), ServerError> { .expect("Error setting Ctrl-C handler"); */ + std::thread::spawn(thread_print_memory_pool_stats); + info!("Iggy server is running. Press Ctrl+C or send SIGTERM to shutdown."); for (idx, handle) in handles.into_iter().enumerate() { info!("Waiting for shard thread {} to complete...", idx); diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index 54bda604..30c4a6a7 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -37,7 +37,7 @@ pub struct QuicSender { } impl Sender for QuicSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { //TODO: Fixme // Not-so-nice code because quinn recv stream has different API for read_exact /* diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index a8158bc1..b00c6276 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -221,16 +221,6 @@ impl IggyShard { } async fn load_version(&self) -> Result<(), IggyError> { - async fn update_system_info( - storage: &Rc<SystemStorage>, - system_info: &mut SystemInfo, - version: &SemanticVersion, - ) -> Result<(), IggyError> { - system_info.update_version(version); - storage.info.save(system_info).await?; - Ok(()) - } - let current_version = &self.version; let mut system_info; let load_system_info = self.storage.info.load().await; @@ -239,7 +229,7 @@ impl IggyShard { if let IggyError::ResourceNotFound(_) = error { info!("System info not found, creating..."); system_info = SystemInfo::default(); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } else { return Err(error); } @@ -248,24 +238,34 @@ impl IggyShard { } info!("Loaded {system_info}."); - let loaded_version = SemanticVersion::from_str(&system_info.version.version)?; + let loaded_version = SemanticVersion::from_str(&system_info.version.version).unwrap(); if current_version.is_equal_to(&loaded_version) { info!("System version {current_version} is up to date."); } else if current_version.is_greater_than(&loaded_version) { info!( "System version {current_version} is greater than {loaded_version}, checking the available migrations..." ); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } else { info!( "System version {current_version} is lower than {loaded_version}, possible downgrade." ); - update_system_info(&self.storage, &mut system_info, current_version).await?; + Self::update_system_info(&self.storage, &mut system_info, current_version).await?; } Ok(()) } + async fn update_system_info( + storage: &Rc<SystemStorage>, + system_info: &mut SystemInfo, + version: &SemanticVersion, + ) -> Result<(), IggyError> { + system_info.update_version(version); + storage.info.save(system_info).await?; + Ok(()) + } + async fn load_state(&self) -> Result<SystemState, IggyError> { let state_entries = self.state.init().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to initialize state entries") diff --git a/core/server/src/shard/system/storage.rs b/core/server/src/shard/system/storage.rs index 736d8e48..ac4aca3e 100644 --- a/core/server/src/shard/system/storage.rs +++ b/core/server/src/shard/system/storage.rs @@ -65,7 +65,7 @@ impl SystemInfoStorage for FileSystemInfoStorage { let file = file::open(&self.path) .await .map_err(|_| IggyError::CannotReadFile)?; - let buffer = PooledBuffer::with_capacity(file_size); + let mut buffer = PooledBuffer::with_capacity(file_size); let (result, buffer) = file .read_exact_at(buffer.slice(0..file_size), 0) .await diff --git a/core/server/src/streaming/segments/direct_file.rs b/core/server/src/streaming/segments/direct_file.rs new file mode 100644 index 00000000..ce1ecb78 --- /dev/null +++ b/core/server/src/streaming/segments/direct_file.rs @@ -0,0 +1,443 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use crate::streaming::utils::{ALIGNMENT, PooledBuffer}; +use compio::fs::{File, OpenOptions}; +use compio::io::AsyncWriteAtExt; +use error_set::ErrContext; +use iggy_common::IggyError; + +#[derive(Debug)] +pub struct DirectFile { + file_path: String, + file: File, + file_position: u64, + tail: PooledBuffer, + tail_len: usize, +} + +impl DirectFile { + pub async fn open( + file_path: &str, + initial_position: u64, + file_exists: bool, + ) -> Result<Self, IggyError> { + let mut file = OpenOptions::new() + .create(true) + .write(true) + .custom_flags(0x4000) + .open(file_path) + .await + .with_error_context(|err| { + format!("Failed to open file with O_DIRECT: {file_path}, error: {err}") + }) + .map_err(|_| IggyError::CannotReadFile)?; + + if !file_exists { + let init_buffer = PooledBuffer::with_capacity(ALIGNMENT); + let (write_result, _) = file.write_all_at(init_buffer, 0).await.into(); + write_result + .with_error_context(|error| { + tracing::error!( + "Failed to initialize file with dummy block: {file_path}, error: {error}" + ); + format!( + "Failed to initialize file with dummy block: {file_path}, error: {error}" + ) + }) + .map_err(|_| IggyError::CannotWriteToFile)?; + + tracing::trace!("Successfully initialized new file with dummy block: {file_path}"); + } + + tracing::trace!( + "Successfully opened DirectFile: {}, position: {}, exists: {}", + file_path, + initial_position, + file_exists + ); + + Ok(Self { + file_path: file_path.to_string(), + file, + file_position: initial_position, + tail: PooledBuffer::with_capacity(ALIGNMENT), + tail_len: 0, + }) + } + + pub async fn get_file_size(&self) -> Result<u64, IggyError> { + self.file + .metadata() + .await + .with_error_context(|error| { + format!( + "Failed to get metadata of file: {}, error: {error}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotReadFileMetadata) + .map(|metadata| metadata.len()) + } + + fn new(file: File, file_path: String, initial_position: u64) -> Self { + Self { + file_path, + file, + file_position: initial_position, + tail: PooledBuffer::with_capacity(ALIGNMENT), + tail_len: 0, + } + } + + pub async fn write_all(&mut self, mut data: &[u8]) -> Result<usize, IggyError> { + let initial_len = data.len(); + tracing::trace!( + "DirectFile write_all called for file: {}, data_len: {}, position: {}, tail_len: {}", + self.file_path, + initial_len, + self.file_position, + self.tail_len + ); + + if self.tail_len > 0 { + let need = ALIGNMENT - self.tail_len; + let take = need.min(data.len()); + self.tail.extend_from_slice(&data[..take]); + self.tail_len += take; + data = &data[take..]; + + if self.tail_len == ALIGNMENT { + self.flush_tail().await?; + } + } + + if !data.is_empty() { + let whole_sectors_end = data.len() & !(ALIGNMENT - 1); + if whole_sectors_end > 0 { + let whole_sectors = &data[..whole_sectors_end]; + let mut written = 0; + + while written < whole_sectors.len() { + let chunk_size = (whole_sectors.len() - written).min(128 * 1024 * 1024); + let chunk = &whole_sectors[written..written + chunk_size]; + + let chunk_buffer = PooledBuffer::from(chunk); + + let (result, _) = self + .file + .write_all_at(chunk_buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!("Failed to write to direct file: {} at position {}, chunk size: {}, error: {}", + self.file_path, self.file_position, chunk_size, e); + IggyError::CannotWriteToFile + })?; + + self.file_position += chunk_size as u64; + written += chunk_size; + } + + data = &data[whole_sectors_end..]; + } + } + + if !data.is_empty() { + self.tail.clear(); + self.tail.extend_from_slice(data); + self.tail_len = data.len(); + } + + Ok(initial_len) + } + + pub async fn flush(&mut self) -> Result<(), IggyError> { + if self.tail_len > 0 { + self.tail.resize(ALIGNMENT, 0); + self.flush_tail().await?; + } + Ok(()) + } + + pub fn position(&self) -> u64 { + self.file_position + } + + pub fn tail_len(&self) -> usize { + self.tail_len + } + + pub fn file_path(&self) -> &str { + &self.file_path + } + + pub fn tail_buffer(&self) -> &PooledBuffer { + &self.tail + } + + pub fn take_tail(&mut self) -> (PooledBuffer, usize) { + let tail = std::mem::replace(&mut self.tail, PooledBuffer::with_capacity(ALIGNMENT)); + let tail_len = self.tail_len; + self.tail_len = 0; + (tail, tail_len) + } + + pub fn set_tail(&mut self, tail: PooledBuffer, tail_len: usize) { + self.tail = tail; + self.tail_len = tail_len; + } + + async fn flush_tail(&mut self) -> Result<(), IggyError> { + assert_eq!(self.tail.len(), ALIGNMENT); + + let tail_buffer = std::mem::replace(&mut self.tail, PooledBuffer::with_capacity(ALIGNMENT)); + + let (result, returned_buf) = self + .file + .write_all_at(tail_buffer, self.file_position) + .await + .into(); + + result.map_err(|e| { + tracing::error!( + "Failed to flush tail for file: {} at position {}, tail size: {}, error: {}", + self.file_path, + self.file_position, + ALIGNMENT, + e + ); + IggyError::CannotWriteToFile + })?; + + self.file_position += ALIGNMENT as u64; + self.tail_len = 0; + self.tail = returned_buf; + self.tail.clear(); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::configs::system::SystemConfig; + use crate::streaming::utils::MemoryPool; + use compio::fs::OpenOptions; + use compio::io::{AsyncReadAt, AsyncReadAtExt}; + use std::sync::Arc; + use tempfile::tempdir; + + #[test] + fn test_direct_file_small_writes() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_direct_io.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + for i in 0..10u64 { + let buf = i.to_le_bytes(); + direct_file.write_all(&buf).await.unwrap(); + } + + direct_file.flush().await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; 512]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + for i in 0..10u64 { + let start = i as usize * 8; + let num = u64::from_le_bytes(read_buffer[start..start + 8].try_into().unwrap()); + assert_eq!(num, i, "Expected {} at position {}, got {}", i, start, num); + } + }); + } + + #[test] + fn test_direct_file_exact_sector_write() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_exact_sector.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data = vec![42u8; ALIGNMENT]; + direct_file.write_all(&data).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), ALIGNMENT as u64); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(read_buffer, vec![42u8; ALIGNMENT]); + }); + } + + #[test] + fn test_direct_file_multiple_sector_writes() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_multiple_sectors.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data1 = vec![1u8; ALIGNMENT * 2]; + let data2 = vec![2u8; ALIGNMENT * 3]; + let data3 = vec![3u8; ALIGNMENT]; + + direct_file.write_all(&data1).await.unwrap(); + direct_file.write_all(&data2).await.unwrap(); + direct_file.write_all(&data3).await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 6]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..ALIGNMENT * 2], &vec![1u8; ALIGNMENT * 2]); + assert_eq!( + &read_buffer[ALIGNMENT * 2..ALIGNMENT * 5], + &vec![2u8; ALIGNMENT * 3] + ); + assert_eq!( + &read_buffer[ALIGNMENT * 5..ALIGNMENT * 6], + &vec![3u8; ALIGNMENT] + ); + }); + } + + #[test] + fn test_direct_file_unaligned_write() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_unaligned.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data = vec![77u8; 1000]; + direct_file.write_all(&data).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 1000 % ALIGNMENT); + assert_eq!(direct_file.position(), ALIGNMENT as u64); + + direct_file.flush().await.unwrap(); + + assert_eq!(direct_file.tail_len(), 0); + assert_eq!(direct_file.position(), (ALIGNMENT * 2) as u64); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 2]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..1000], &vec![77u8; 1000]); + assert_eq!( + &read_buffer[1000..ALIGNMENT * 2], + &vec![0u8; ALIGNMENT * 2 - 1000] + ); + }); + } + + #[test] + fn test_direct_file_cross_sector_boundary() { + compio::runtime::Runtime::new().unwrap().block_on(async { + MemoryPool::init_pool(Arc::new(SystemConfig::default())); + let temp_dir = tempdir().unwrap(); + let file_path = temp_dir.path().join("test_cross_boundary.bin"); + + let mut direct_file = DirectFile::open(file_path.to_str().unwrap(), 0, false) + .await + .unwrap(); + + let data1 = vec![1u8; 400]; + let data2 = vec![2u8; 200]; + let data3 = vec![3u8; 100]; + + direct_file.write_all(&data1).await.unwrap(); + direct_file.write_all(&data2).await.unwrap(); + direct_file.write_all(&data3).await.unwrap(); + + assert_eq!(direct_file.tail_len(), 700 % ALIGNMENT); + + direct_file.flush().await.unwrap(); + + let file = OpenOptions::new() + .read(true) + .custom_flags(0x4000) + .open(&file_path) + .await + .unwrap(); + + let mut read_buffer = vec![0u8; ALIGNMENT * 2]; + let (result, buf) = file.read_at(read_buffer, 0).await.into(); + result.unwrap(); + read_buffer = buf; + + assert_eq!(&read_buffer[0..400], &vec![1u8; 400]); + assert_eq!(&read_buffer[400..600], &vec![2u8; 200]); + assert_eq!(&read_buffer[600..700], &vec![3u8; 100]); + }); + } +} diff --git a/core/server/src/streaming/segments/indexes/index_reader.rs b/core/server/src/streaming/segments/indexes/index_reader.rs index ce235ef4..2a9fef00 100644 --- a/core/server/src/streaming/segments/indexes/index_reader.rs +++ b/core/server/src/streaming/segments/indexes/index_reader.rs @@ -21,6 +21,7 @@ use crate::streaming::utils::PooledBuffer; use bytes::BytesMut; use compio::{ BufResult, + buf::{IntoInner, IoBuf}, fs::{File, OpenOptions}, io::AsyncReadAtExt, }; @@ -336,19 +337,15 @@ impl IndexReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let mut buf = PooledBuffer::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + let (result, buf) = self + .file + .read_exact_at(buf.slice(0..len as usize), offset as u64) + .await + .into(); + result?; + + Ok(buf.into_inner()) } /// Gets the nth index from the index file. diff --git a/core/server/src/streaming/segments/indexes/index_writer.rs b/core/server/src/streaming/segments/indexes/index_writer.rs index 416cde21..782e633e 100644 --- a/core/server/src/streaming/segments/indexes/index_writer.rs +++ b/core/server/src/streaming/segments/indexes/index_writer.rs @@ -16,25 +16,20 @@ * under the License. */ -use compio::fs::File; -use compio::fs::OpenOptions; -use compio::io::AsyncWriteAtExt; +use crate::streaming::segments::DirectFile; +use crate::streaming::utils::PooledBuffer; use error_set::ErrContext; -use iggy_common::INDEX_SIZE; -use iggy_common::IggyError; +use iggy_common::{INDEX_SIZE, IggyError}; use std::sync::{ Arc, atomic::{AtomicU64, Ordering}, }; use tracing::trace; -use crate::streaming::utils::PooledBuffer; - /// A dedicated struct for writing to the index file. #[derive(Debug)] pub struct IndexWriter { - file_path: String, - file: File, + direct_file: DirectFile, index_size_bytes: Arc<AtomicU64>, fsync: bool, } @@ -47,39 +42,34 @@ impl IndexWriter { fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) - .open(file_path) - .await - .with_error_context(|error| format!("Failed to open index file: {file_path}. {error}")) - .map_err(|_| IggyError::CannotReadFile)?; + let file_position = if file_exists { + let current_size = index_size_bytes.load(Ordering::Acquire); + (current_size + 511) & !511 + } else { + index_size_bytes.store(0, Ordering::Release); + 0 + }; - if file_exists { - let _ = file.sync_all().await.with_error_context(|error| { - format!("Failed to fsync index file after creation: {file_path}. {error}",) - }); + trace!( + "Opening index file for writing: {file_path}, file_position: {}", + file_position + ); - let actual_index_size = file - .metadata() - .await - .with_error_context(|error| { - format!("Failed to get metadata of index file: {file_path}. {error}") - }) - .map_err(|_| IggyError::CannotReadFileMetadata)? - .len(); + let mut direct_file = DirectFile::open(file_path, file_position, file_exists).await?; + if file_exists { + let actual_index_size = direct_file.get_file_size().await?; index_size_bytes.store(actual_index_size, Ordering::Release); - } - trace!( - "Opened index file for writing: {file_path}, size: {}", - index_size_bytes.load(Ordering::Acquire) - ); + trace!( + "Opened existing index file: {file_path}, size: {}, file_position: {}", + actual_index_size, + direct_file.position() + ); + } Ok(Self { - file_path: file_path.to_string(), - file, + direct_file, index_size_bytes, fsync, }) @@ -92,44 +82,33 @@ impl IndexWriter { } let count = indexes.len() / INDEX_SIZE; - let len = indexes.len(); + let actual_len = indexes.len(); - let position = self.index_size_bytes.load(Ordering::Relaxed); - self.file - .write_all_at(indexes, position) + trace!( + "Saving {count} indexes to file: {} (size: {} bytes)", + self.direct_file.file_path(), + actual_len + ); + + let bytes_written = self + .direct_file + .write_all(&indexes) .await - .0 .with_error_context(|error| { format!( "Failed to write {} indexes to file: {}. {error}", - count, self.file_path + count, + self.direct_file.file_path() ) }) .map_err(|_| IggyError::CannotSaveIndexToSegment)?; + let new_logical_size = self.index_size_bytes.load(Ordering::Relaxed) + bytes_written as u64; self.index_size_bytes - .fetch_add(len as u64, Ordering::Release); - - if self.fsync { - let _ = self.fsync().await; - } - trace!( - "Saved {count} indexes of size {} to file: {}", - INDEX_SIZE * count, - self.file_path - ); + .store(new_logical_size, Ordering::Release); - Ok(()) - } + trace!("Saved {count} indexes. Logical size: {}", new_logical_size); - pub async fn fsync(&self) -> Result<(), IggyError> { - self.file - .sync_all() - .await - .with_error_context(|error| { - format!("Failed to fsync index file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; Ok(()) } } diff --git a/core/server/src/streaming/segments/messages/messages_reader.rs b/core/server/src/streaming/segments/messages/messages_reader.rs index e81eb793..5ad23502 100644 --- a/core/server/src/streaming/segments/messages/messages_reader.rs +++ b/core/server/src/streaming/segments/messages/messages_reader.rs @@ -19,6 +19,7 @@ use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::utils::PooledBuffer; use bytes::BytesMut; +use compio::buf::{IntoInner, IoBuf}; use compio::fs::{File, OpenOptions}; use compio::io::AsyncReadAtExt; use error_set::ErrContext; @@ -176,18 +177,13 @@ impl MessagesReader { len: u32, use_pool: bool, ) -> Result<PooledBuffer, std::io::Error> { - if use_pool { - let mut buf = PooledBuffer::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(buf) - } else { - let mut buf = BytesMut::with_capacity(len as usize); - unsafe { buf.set_len(len as usize) }; - let (result, buf) = self.file.read_exact_at(buf, offset as u64).await.into(); - result?; - Ok(PooledBuffer::from_existing(buf)) - } + let mut buf = PooledBuffer::with_capacity(len as usize); + let (result, buf) = self + .file + .read_exact_at(buf.slice(0..len as usize), offset as u64) + .await + .into(); + result?; + Ok(buf.into_inner()) } } diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index f3d43d1e..ad6a0968 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -16,8 +16,9 @@ * under the License. */ -use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch}; -use compio::fs::{File, OpenOptions}; +use crate::streaming::segments::{ + DirectFile, IggyMessagesBatchSet, messages::write_batch_with_direct_file, +}; use error_set::ErrContext; use iggy_common::{IggyByteSize, IggyError}; use std::sync::{ @@ -29,68 +30,51 @@ use tracing::{error, trace}; /// A dedicated struct for writing to the messages file. #[derive(Debug)] pub struct MessagesWriter { - file_path: String, - /// Holds the file for synchronous writes; when asynchronous persistence is enabled, this will be None. - file: Option<File>, - /// When set, asynchronous writes are handled by this persister task. + direct_file: Option<DirectFile>, messages_size_bytes: Arc<AtomicU64>, fsync: bool, } impl MessagesWriter { - /// Opens the messages file in write mode. - /// - /// If the server confirmation is set to `NoWait`, the file handle is transferred to the - /// persister task (and stored in `persister_task`) so that writes are done asynchronously. - /// Otherwise, the file is retained in `self.file` for synchronous writes. pub async fn new( file_path: &str, messages_size_bytes: Arc<AtomicU64>, fsync: bool, file_exists: bool, ) -> Result<Self, IggyError> { - let file = OpenOptions::new() - .create(true) - .write(true) - .open(file_path) - .await - .with_error_context(|err| { - format!("Failed to open messages file: {file_path}, error: {err}") - }) - .map_err(|_| IggyError::CannotReadFile)?; + let file_position = if file_exists { + let current_size = messages_size_bytes.load(Ordering::Acquire); + (current_size + 511) & !511 + } else { + messages_size_bytes.store(0, Ordering::Release); + 0 + }; - if file_exists { - let _ = file.sync_all().await.with_error_context(|error| { - format!("Failed to fsync messages file after creation: {file_path}, error: {error}") - }); + trace!( + "Opening messages file for writing: {file_path}, file_position: {}", + file_position + ); - let actual_messages_size = file - .metadata() - .await - .with_error_context(|error| { - format!("Failed to get metadata of messages file: {file_path}, error: {error}") - }) - .map_err(|_| IggyError::CannotReadFileMetadata)? - .len(); + let mut direct_file = DirectFile::open(file_path, file_position, file_exists).await?; + if file_exists { + let actual_messages_size = direct_file.get_file_size().await?; messages_size_bytes.store(actual_messages_size, Ordering::Release); - } - trace!( - "Opened messages file for writing: {file_path}, size: {}", - messages_size_bytes.load(Ordering::Acquire) - ); + trace!( + "Opened existing messages file: {file_path}, size: {}, file_position: {}", + actual_messages_size, + direct_file.position() + ); + } - let file = Some(file); Ok(Self { - file_path: file_path.to_string(), - file, + direct_file: Some(direct_file), messages_size_bytes, fsync, }) } - /// Append a batch of messages to the messages file. pub async fn save_batch_set( &mut self, batch_set: IggyMessagesBatchSet, @@ -98,49 +82,34 @@ impl MessagesWriter { let messages_size = batch_set.size(); let messages_count = batch_set.count(); let containers_count = batch_set.containers_count(); - trace!( - "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", - self.file_path - ); - let position = self.messages_size_bytes.load(Ordering::Relaxed); - if let Some(ref mut file) = self.file { - write_batch(file, position, batch_set) + let actual_written = if let Some(ref mut direct_file) = self.direct_file { + trace!( + "Saving batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to messages file: {}", + direct_file.file_path() + ); + + write_batch_with_direct_file(direct_file, batch_set) .await .with_error_context(|error| { format!( "Failed to write batch to messages file: {}. {error}", - self.file_path + direct_file.file_path() ) - })?; + })? } else { - error!("File handle is not available for synchronous write."); + tracing::error!("File handle is not available for synchronous write."); return Err(IggyError::CannotWriteToFile); - } - - if self.fsync { - let _ = self.fsync().await; - } + }; + let logical_size = self.messages_size_bytes.load(Ordering::Relaxed) + actual_written as u64; self.messages_size_bytes - .fetch_add(messages_size as u64, Ordering::Release); + .store(logical_size, Ordering::Release); + trace!( - "Written batch set of size {messages_size} bytes ({containers_count} containers, {messages_count} messages) to disk messages file: {}", - self.file_path + "Written batch set of size {messages_size} bytes to disk. Logical size: {}", + logical_size ); Ok(IggyByteSize::from(messages_size as u64)) } - - pub async fn fsync(&self) -> Result<(), IggyError> { - if let Some(file) = self.file.as_ref() { - file.sync_all() - .await - .with_error_context(|error| { - format!("Failed to fsync messages file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; - } - - Ok(()) - } } diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index a3989a2e..537ecb78 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,25 +19,27 @@ mod messages_reader; mod messages_writer; -use super::IggyMessagesBatchSet; -use compio::{fs::File, io::AsyncWriteAtExt}; +use super::{DirectFile, IggyMessagesBatchSet}; +use crate::streaming::utils::PooledBuffer; use iggy_common::IggyError; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; -/// Vectored write a batches of messages to file -async fn write_batch( - file: &mut File, - position: u64, +async fn write_batch_with_direct_file( + direct_file: &mut DirectFile, mut batches: IggyMessagesBatchSet, ) -> Result<usize, IggyError> { - let total_written = batches.iter().map(|b| b.size() as usize).sum(); - let batches = batches - .iter_mut() - .map(|b| b.take_messages()) - .collect::<Vec<_>>(); - let (result, _) = file.write_vectored_all_at(batches, position).await.into(); - result.map_err(|_| IggyError::CannotWriteToFile)?; + let total_written: usize = batches.iter().map(|b| b.size() as usize).sum(); + let mut messages_count = 0; + + for batch in batches.iter_mut() { + messages_count += batch.count(); + let messages = batch.take_messages(); + direct_file.write_all(&messages).await?; + } + + tracing::trace!("Saved {} messages", messages_count); + Ok(total_written) } diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index f3105e54..9fbd4abe 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +mod direct_file; mod indexes; mod messages; mod messages_accumulator; @@ -24,6 +25,7 @@ mod segment; mod types; mod writing_messages; +pub use direct_file::DirectFile; pub use indexes::IggyIndexesMut; pub use messages_accumulator::MessagesAccumulator; pub use segment::Segment; diff --git a/core/server/src/streaming/segments/reading_messages.rs b/core/server/src/streaming/segments/reading_messages.rs index 5f0f6bb4..fa46c183 100644 --- a/core/server/src/streaming/segments/reading_messages.rs +++ b/core/server/src/streaming/segments/reading_messages.rs @@ -273,8 +273,16 @@ impl Segment { relative_start_offset: u32, count: u32, ) -> Result<Option<IggyIndexesMut>, IggyError> { - let indexes = if !self.indexes.is_empty() { - self.indexes.slice_by_offset(relative_start_offset, count) + let indexes = if let Some(ref indexes) = self.indexes { + if !indexes.is_empty() { + indexes.slice_by_offset(relative_start_offset, count) + } else { + self.index_reader + .as_ref() + .expect("Index reader not initialized") + .load_from_disk_by_offset(relative_start_offset, count) + .await? + } } else { self.index_reader .as_ref() @@ -290,8 +298,16 @@ impl Segment { timestamp: u64, count: u32, ) -> Result<Option<IggyIndexesMut>, IggyError> { - let indexes = if !self.indexes.is_empty() { - self.indexes.slice_by_timestamp(timestamp, count) + let indexes = if let Some(ref indexes) = self.indexes { + if !indexes.is_empty() { + indexes.slice_by_timestamp(timestamp, count) + } else { + self.index_reader + .as_ref() + .unwrap() + .load_from_disk_by_timestamp(timestamp, count) + .await? + } } else { self.index_reader .as_ref() diff --git a/core/server/src/streaming/segments/segment.rs b/core/server/src/streaming/segments/segment.rs index 08b30ef6..937f4cae 100644 --- a/core/server/src/streaming/segments/segment.rs +++ b/core/server/src/streaming/segments/segment.rs @@ -62,7 +62,7 @@ pub struct Segment { pub(super) message_expiry: IggyExpiry, pub(super) accumulator: MessagesAccumulator, pub(super) config: Arc<SystemConfig>, - pub(super) indexes: IggyIndexesMut, + pub(super) indexes: Option<IggyIndexesMut>, pub(super) messages_size: Arc<AtomicU64>, pub(super) indexes_size: Arc<AtomicU64>, } @@ -92,10 +92,6 @@ impl Segment { _ => message_expiry, }; - // In order to preserve BytesMut buffer between restarts, initialize it with a capacity 0. - // We don't care whether server startup would be couple of seconds longer. - let indexes_capacity = if fresh { SIZE_16MB / INDEX_SIZE } else { 0 }; - Segment { stream_id, topic_id, @@ -109,7 +105,7 @@ impl Segment { last_index_position: 0, max_size_bytes: config.segment.size, message_expiry, - indexes: IggyIndexesMut::with_capacity(indexes_capacity, 0), + indexes: None, accumulator: MessagesAccumulator::default(), is_closed: false, messages_writer: None, @@ -145,7 +141,7 @@ impl Segment { self.last_index_position = log_size_bytes as _; - self.indexes = self + let loaded_indexes = self .index_reader .as_ref() .unwrap() @@ -154,17 +150,21 @@ impl Segment { .with_error_context(|error| format!("Failed to load indexes for {self}. {error}")) .map_err(|_| IggyError::CannotReadFile)?; - let last_index_offset = if self.indexes.is_empty() { + let last_index_offset = if loaded_indexes.is_empty() { 0_u64 } else { - self.indexes.last().unwrap().offset() as u64 + loaded_indexes.last().unwrap().offset() as u64 }; + if !loaded_indexes.is_empty() { + self.indexes = Some(loaded_indexes); + } + self.end_offset = self.start_offset + last_index_offset; info!( "Loaded {} indexes for segment with start offset: {}, end offset: {}, and partition with ID: {}, topic with ID: {}, and stream with ID: {}.", - self.indexes.count(), + self.indexes.as_ref().map_or(0, |idx| idx.count()), self.start_offset, self.end_offset, self.partition_id, @@ -300,27 +300,8 @@ impl Segment { } pub async fn shutdown_writing(&mut self) { - if let Some(log_writer) = self.messages_writer.take() { - //TODO: Fixme not sure whether we should spawn a task here. - compio::runtime::spawn(async move { - let _ = log_writer.fsync().await; - }); - } else { - warn!( - "Log writer already closed when calling close() for {}", - self - ); - } - - if let Some(index_writer) = self.index_writer.take() { - //TODO: Fixme not sure whether we should spawn a task here. - compio::runtime::spawn(async move { - let _ = index_writer.fsync().await; - drop(index_writer) - }); - } else { - warn!("Index writer already closed when calling close()"); - } + let _ = self.messages_writer.take().map(|mut writer| {}); + let _ = self.index_writer.take().map(|mut writer| {}); } pub async fn delete(&mut self) -> Result<(), IggyError> { @@ -382,6 +363,14 @@ impl Segment { self.message_expiry = message_expiry; } + /// Ensure indexes are initialized with proper capacity + pub fn ensure_indexes(&mut self) { + if self.indexes.is_none() { + let capacity = SIZE_16MB / INDEX_SIZE; + self.indexes = Some(IggyIndexesMut::with_capacity(capacity, 0)); + } + } + pub fn is_closed(&self) -> bool { self.is_closed } @@ -412,8 +401,7 @@ impl Segment { /// Explicitly drop the old indexes to ensure memory is freed pub fn drop_indexes(&mut self) { - let old_indexes = std::mem::replace(&mut self.indexes, IggyIndexesMut::empty()); - drop(old_indexes); + self.indexes = None; } } @@ -487,7 +475,7 @@ mod tests { assert_eq!(segment.messages_file_path(), messages_file_path); assert_eq!(segment.index_file_path(), index_path); assert_eq!(segment.message_expiry, message_expiry); - assert!(segment.indexes.is_empty()); + assert!(segment.indexes.is_none()); assert!(!segment.is_closed()); assert!(!segment.is_full().await); } @@ -530,6 +518,6 @@ mod tests { true, ); - assert!(segment.indexes.is_empty()); + assert!(segment.indexes.is_none()); } } diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs b/core/server/src/streaming/segments/types/messages_batch_mut.rs index 764faa11..7152f53d 100644 --- a/core/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs @@ -488,7 +488,7 @@ impl IggyMessagesBatchMut { /// subsequent messages in the new buffer. #[allow(clippy::too_many_arguments)] fn rebuild_indexes_for_chunk( - new_buffer: &BytesMut, + new_buffer: &PooledBuffer, new_indexes: &mut IggyIndexesMut, offset_in_new_buffer: &mut u32, chunk_start: usize, @@ -571,7 +571,7 @@ impl IggyMessagesBatchMut { for &(start, end) in &boundaries_to_remove { if start > last_pos { - let keep_len = start - last_pos; + let keep_len: usize = start - last_pos; let chunk = source.split_to(keep_len); let chunk_start_in_new_buffer = new_buffer.len(); new_buffer.put(chunk); @@ -684,7 +684,8 @@ impl IggyMessagesBatchMut { prev_offset = message.header().offset(); prev_position = index.position(); - messages_size += message.size(); + let msg_size = message.size(); + messages_size += msg_size; messages_count += 1; } @@ -822,11 +823,3 @@ impl Index<usize> for IggyMessagesBatchMut { &self.messages[start..end] } } - -impl Deref for IggyMessagesBatchMut { - type Target = BytesMut; - - fn deref(&self) -> &Self::Target { - &self.messages - } -} diff --git a/core/server/src/streaming/segments/writing_messages.rs b/core/server/src/streaming/segments/writing_messages.rs index e95d1e2c..8c6a785c 100644 --- a/core/server/src/streaming/segments/writing_messages.rs +++ b/core/server/src/streaming/segments/writing_messages.rs @@ -82,7 +82,8 @@ impl Segment { let batch_size = batches.size(); let batch_count = batches.count(); - batches.append_indexes_to(&mut self.indexes); + self.ensure_indexes(); + batches.append_indexes_to(self.indexes.as_mut().unwrap()); let saved_bytes = self .messages_writer @@ -98,7 +99,7 @@ impl Segment { self.last_index_position += saved_bytes.as_bytes_u64() as u32; - let unsaved_indexes_slice = self.indexes.unsaved_slice(); + let unsaved_indexes_slice = self.indexes.as_ref().unwrap().unsaved_slice(); let len = unsaved_indexes_slice.len(); self.index_writer .as_mut() @@ -109,17 +110,23 @@ impl Segment { format!("Failed to save index of {} indexes to {self}. {error}", len) })?; - self.indexes.mark_saved(); + self.indexes.as_mut().unwrap().mark_saved(); if self.config.segment.cache_indexes == CacheIndexesConfig::None { - self.indexes.clear(); + if let Some(indexes) = self.indexes.as_mut() { + indexes.clear(); + } } self.check_and_handle_segment_full().await?; trace!( - "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", - unsaved_messages_count, self.start_offset, self.partition_id, saved_bytes + "Saved {} messages on disk in segment with start offset: {}, end offset: {}, for partition with ID: {}, total bytes written: {}.", + unsaved_messages_count, + self.start_offset, + self.end_offset, + self.partition_id, + saved_bytes ); Ok(unsaved_messages_count) diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index 2cc031d3..df0783b1 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -17,7 +17,7 @@ */ use crate::configs::system::SystemConfig; -use bytes::BytesMut; +use aligned_vec::{AVec, ConstAlign}; use crossbeam::queue::ArrayQueue; use human_repr::HumanCount; use once_cell::sync::OnceCell; @@ -26,15 +26,18 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tracing::{error, info, trace, warn}; +pub const ALIGNMENT: usize = 512; +pub type Align512 = ConstAlign<ALIGNMENT>; +pub type AlignedBuffer = AVec<u8, Align512>; + /// Global memory pool instance. Use `memory_pool()` to access it. pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new(); /// Total number of distinct bucket sizes. -const NUM_BUCKETS: usize = 32; +const NUM_BUCKETS: usize = 31; /// Array of bucket sizes in ascending order. Each entry is a distinct buffer size (in bytes). const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ - 256, 512, 1024, 2 * 1024, @@ -49,8 +52,8 @@ const BUCKET_SIZES: [usize; NUM_BUCKETS] = [ 768 * 1024, 1024 * 1024, 1536 * 1024, - 2 * 1024 * 1024, // Above 2MiB everything should be rounded up to the next power of 2 to take advantage of hugepages - 4 * 1024 * 1024, // (environment variables MIMALLOC_ALLOW_LARGE_OS_PAGES=1 and MIMALLOC_LARGE_OS_PAGES=1). + 2 * 1024 * 1024, + 4 * 1024 * 1024, 6 * 1024 * 1024, 8 * 1024 * 1024, 10 * 1024 * 1024, @@ -75,7 +78,7 @@ pub fn memory_pool() -> &'static MemoryPool { .expect("Memory pool not initialized - MemoryPool::init_pool should be called first") } -/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` buffers. +/// A memory pool that maintains fixed-size buckets for reusing `AlignedBuffer` buffers. /// /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool tracks: /// - Buffers currently in use (`in_use`) @@ -97,7 +100,7 @@ pub struct MemoryPool { /// Array of queues for reusable buffers. Each queue can store up to `bucket_capacity` buffers. /// The length of each queue (`buckets[i].len()`) is how many **free** buffers are currently available. /// Free doesn't mean the buffer is allocated, it just means it's not in use. - buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS], + buckets: [Arc<ArrayQueue<AlignedBuffer>>; NUM_BUCKETS], /// Number of buffers **in use** for each bucket size (grow/shrink as they are acquired/released). in_use: [Arc<AtomicUsize>; NUM_BUCKETS], @@ -163,16 +166,16 @@ impl MemoryPool { MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); } - /// Acquire a `BytesMut` buffer with at least `capacity` bytes. + /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes. /// /// - If a bucket can fit `capacity`, try to pop from its free buffer queue; otherwise create a new buffer. /// - If `memory_limit` would be exceeded, allocate outside the pool. /// /// Returns a tuple of (buffer, was_pool_allocated) where was_pool_allocated indicates if the buffer /// was allocated from the pool (true) or externally (false). - pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) { + pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) { if !self.is_enabled { - return (BytesMut::with_capacity(capacity), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false); } let current = self.pool_current_size(); @@ -193,12 +196,12 @@ impl MemoryPool { new_size, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(new_size), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, new_size), false); } self.inc_bucket_alloc(idx); self.inc_bucket_in_use(idx); - (BytesMut::with_capacity(new_size), true) + (AlignedBuffer::with_capacity(ALIGNMENT, new_size), true) } None => { if current + capacity > self.memory_limit { @@ -207,16 +210,16 @@ impl MemoryPool { capacity, current, self.memory_limit ); self.inc_external_allocations(); - return (BytesMut::with_capacity(capacity), false); + return (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false); } self.inc_external_allocations(); - (BytesMut::with_capacity(capacity), false) + (AlignedBuffer::with_capacity(ALIGNMENT, capacity), false) } } } - /// Return a `BytesMut` buffer previously acquired from the pool. + /// Return a `AlignedBuffer` buffer previously acquired from the pool. /// /// - If `current_capacity` differs from `original_capacity`, increments `resize_events`. /// - If a matching bucket exists, place it back in that bucket's queue (if space is available). @@ -224,7 +227,7 @@ impl MemoryPool { /// - The `was_pool_allocated` flag indicates if this buffer was originally allocated from the pool. pub fn release_buffer( &self, - buffer: BytesMut, + buffer: AlignedBuffer, original_capacity: usize, was_pool_allocated: bool, ) { @@ -235,10 +238,6 @@ impl MemoryPool { let current_capacity = buffer.capacity(); if current_capacity != original_capacity { self.inc_resize_events(); - trace!( - "Buffer capacity {} != original {} when returning", - current_capacity, original_capacity - ); } if was_pool_allocated { @@ -438,11 +437,11 @@ impl MemoryPool { /// Return a buffer to the pool by calling `release_buffer` with the original capacity. /// This extension trait makes it easy to do `some_bytes.return_to_pool(orig_cap, was_pool_allocated)`. -pub trait BytesMutExt { +pub trait AlignedBufferExt { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool); } -impl BytesMutExt for BytesMut { +impl AlignedBufferExt for AlignedBuffer { fn return_to_pool(self, original_capacity: usize, was_pool_allocated: bool) { memory_pool().release_buffer(self, original_capacity, was_pool_allocated); } @@ -667,27 +666,6 @@ mod tests { ); } - // Test put_bytes - { - let initial_events = pool.resize_events(); - let mut buffer = PooledBuffer::with_capacity(4 * 1024); - let orig_bucket_idx = pool.best_fit(buffer.capacity()).unwrap(); - let orig_in_use = pool.bucket_current_elements(orig_bucket_idx); - - buffer.put_bytes(0, 64 * 1024); // 64KiB of zeros - - assert_eq!( - pool.resize_events(), - initial_events + 1, - "put_bytes should trigger resize event" - ); - assert_eq!( - pool.bucket_current_elements(orig_bucket_idx), - orig_in_use - 1, - "put_bytes should update bucket accounting" - ); - } - // Test extend_from_slice { let initial_events = pool.resize_events(); diff --git a/core/server/src/streaming/utils/mod.rs b/core/server/src/streaming/utils/mod.rs index e441a051..d858d684 100644 --- a/core/server/src/streaming/utils/mod.rs +++ b/core/server/src/streaming/utils/mod.rs @@ -25,5 +25,5 @@ pub mod random_id; mod memory_pool; mod pooled_buffer; -pub use memory_pool::{MemoryPool, memory_pool}; +pub use memory_pool::{ALIGNMENT, MemoryPool, memory_pool}; pub use pooled_buffer::PooledBuffer; diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index ee6f2014..b3769bf2 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -16,8 +16,8 @@ * under the License. */ -use super::memory_pool::{BytesMutExt, memory_pool}; -use bytes::{Buf, BufMut, BytesMut}; +use super::memory_pool::{AlignedBuffer, AlignedBufferExt, memory_pool}; +use crate::streaming::utils::memory_pool::ALIGNMENT; use compio::buf::{IoBuf, IoBufMut, SetBufInit}; use std::ops::{Deref, DerefMut}; @@ -26,7 +26,7 @@ pub struct PooledBuffer { from_pool: bool, original_capacity: usize, original_bucket_idx: Option<usize>, - inner: BytesMut, + inner: AlignedBuffer, } impl Default for PooledBuffer { @@ -42,13 +42,14 @@ impl PooledBuffer { /// /// * `capacity` - The capacity of the buffer pub fn with_capacity(capacity: usize) -> Self { - let (buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); + let (mut buffer, was_pool_allocated) = memory_pool().acquire_buffer(capacity); let original_capacity = buffer.capacity(); let original_bucket_idx = if was_pool_allocated { memory_pool().best_fit(original_capacity) } else { None }; + Self { from_pool: was_pool_allocated, original_capacity, @@ -57,27 +58,13 @@ impl PooledBuffer { } } - /// Creates a new pooled buffer from an existing `BytesMut`. - /// - /// # Arguments - /// - /// * `existing` - The existing `BytesMut` buffer - pub fn from_existing(existing: BytesMut) -> Self { - Self { - from_pool: false, - original_capacity: existing.capacity(), - original_bucket_idx: None, - inner: existing, - } - } - /// Creates an empty pooled buffer. pub fn empty() -> Self { Self { from_pool: false, original_capacity: 0, original_bucket_idx: None, - inner: BytesMut::new(), + inner: AlignedBuffer::new(ALIGNMENT), } } @@ -90,6 +77,11 @@ impl PooledBuffer { let current_capacity = self.inner.capacity(); if current_capacity != self.original_capacity { + tracing::error!( + "Pooled buffer resized from {} to {}", + self.original_capacity, + current_capacity + ); memory_pool().inc_resize_events(); if let Some(orig_idx) = self.original_bucket_idx { @@ -131,49 +123,111 @@ impl PooledBuffer { } } - /// Wrapper for put_bytes which might cause resize - pub fn put_bytes(&mut self, byte: u8, len: usize) { + /// Wrapper for put_slice which might cause resize + pub fn put_slice(&mut self, src: &[u8]) { let before_cap = self.inner.capacity(); - self.inner.put_bytes(byte, len); + self.extend_from_slice(src); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_slice which might cause resize - pub fn put_slice(&mut self, src: &[u8]) { + /// Wrapper for put_u32_le which might cause resize + pub fn put_u32_le(&mut self, value: u32) { let before_cap = self.inner.capacity(); - self.inner.put_slice(src); + self.reserve(4); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_u32_le which might cause resize - pub fn put_u32_le(&mut self, value: u32) { + /// Wrapper for put_u64_le which might cause resize + pub fn put_u64_le(&mut self, value: u64) { let before_cap = self.inner.capacity(); - self.inner.put_u32_le(value); + self.reserve(8); + self.inner.extend_from_slice(&value.to_le_bytes()); if self.inner.capacity() != before_cap { self.check_for_resize(); } } - /// Wrapper for put_u64_le which might cause resize - pub fn put_u64_le(&mut self, value: u64) { + /// Get a slice of the buffer's contents + pub fn as_slice(&self) -> &[u8] { + self.inner.as_slice() + } + + /// Get the length of the buffer + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Check if the buffer is empty + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Get the capacity of the buffer + pub fn capacity(&self) -> usize { + self.inner.capacity() + } + + /// Clear the buffer + pub fn clear(&mut self) { + self.inner.clear() + } + + /// Resize the buffer + pub fn resize(&mut self, new_len: usize, value: u8) { let before_cap = self.inner.capacity(); - self.inner.put_u64_le(value); + self.inner.resize(new_len, value); if self.inner.capacity() != before_cap { self.check_for_resize(); } } + + /// Split the buffer at the given index, returning the data before the split + /// and keeping the data after the split in self + pub fn split_to(&mut self, at: usize) -> Vec<u8> { + if at > self.inner.len() { + panic!("split_to out of bounds"); + } + + let mut result = Vec::with_capacity(at); + result.extend_from_slice(&self.inner[..at]); + + let remaining = self.inner.len() - at; + let mut new_data = Vec::with_capacity(remaining); + new_data.extend_from_slice(&self.inner[at..]); + + self.inner.clear(); + self.inner.extend_from_slice(&new_data); + + result + } + + /// Put bytes from a slice + pub fn put<T: AsRef<[u8]>>(&mut self, data: T) { + self.extend_from_slice(data.as_ref()); + } + + /// Align the buffer length to the next 512-byte boundary by padding with zeros + pub fn align(&mut self) { + let current_len = self.inner.len(); + let aligned_len = (current_len + 511) & !511; + if aligned_len > current_len { + let padding = aligned_len - current_len; + self.resize(aligned_len, 0); + } + } } impl Deref for PooledBuffer { - type Target = BytesMut; + type Target = AlignedBuffer; fn deref(&self) -> &Self::Target { &self.inner @@ -189,12 +243,18 @@ impl DerefMut for PooledBuffer { impl Drop for PooledBuffer { fn drop(&mut self) { if self.from_pool { - let buf = std::mem::take(&mut self.inner); + let buf = std::mem::replace(&mut self.inner, AlignedBuffer::new(ALIGNMENT)); buf.return_to_pool(self.original_capacity, true); } } } +impl AsRef<[u8]> for PooledBuffer { + fn as_ref(&self) -> &[u8] { + self.inner.as_slice() + } +} + impl From<&[u8]> for PooledBuffer { fn from(slice: &[u8]) -> Self { let mut buf = PooledBuffer::with_capacity(slice.len()); @@ -203,30 +263,10 @@ impl From<&[u8]> for PooledBuffer { } } -impl Buf for PooledBuffer { - fn remaining(&self) -> usize { - self.inner.remaining() - } - - fn chunk(&self) -> &[u8] { - self.inner.chunk() - } - - fn advance(&mut self, cnt: usize) { - self.inner.advance(cnt) - } - - fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> usize { - self.inner.chunks_vectored(dst) - } -} - impl SetBufInit for PooledBuffer { unsafe fn set_buf_init(&mut self, len: usize) { - if self.inner.len() <= len { - unsafe { - self.inner.set_len(len); - } + unsafe { + self.inner.set_len(len); } } } @@ -239,7 +279,7 @@ unsafe impl IoBufMut for PooledBuffer { unsafe impl IoBuf for PooledBuffer { fn as_buf_ptr(&self) -> *const u8 { - self.inner.as_buf_ptr() + self.inner.as_ptr() } fn buf_len(&self) -> usize { diff --git a/core/server/src/tcp/connection_handler.rs b/core/server/src/tcp/connection_handler.rs index 9c6803b8..1f1bb482 100644 --- a/core/server/src/tcp/connection_handler.rs +++ b/core/server/src/tcp/connection_handler.rs @@ -49,7 +49,7 @@ pub(crate) async fn handle_connection( loop { let read_future = sender.read(length_buffer.clone()); - let (read_length, initial_buffer) = futures::select! { + let initial_buffer = futures::select! { _ = stop_receiver.recv().fuse() => { info!("Connection stop signal received for session: {}", session); let _ = sender.send_error_response(IggyError::Disconnected).await; @@ -57,8 +57,8 @@ pub(crate) async fn handle_connection( } result = read_future.fuse() => { match result { - (Ok(read_length), initial_buffer) => (read_length, initial_buffer), - (Err(error), _) => { + Ok(initial_buffer) => initial_buffer, + Err(error) => { if error.as_code() == IggyError::ConnectionClosed.as_code() { return Err(ConnectionError::from(error)); } else { @@ -71,9 +71,9 @@ pub(crate) async fn handle_connection( } }; - if read_length != INITIAL_BYTES_LENGTH { + if initial_buffer.len() != INITIAL_BYTES_LENGTH { sender.send_error_response(IggyError::CommandLengthError(format!( - "Unable to read the TCP request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {read_length} bytes." + "Unable to read the TCP request length, expected: {INITIAL_BYTES_LENGTH} bytes, received: {} bytes.", initial_buffer.len() ))).await?; continue; } @@ -81,8 +81,7 @@ pub(crate) async fn handle_connection( let initial_buffer = initial_buffer.freeze(); let length = u32::from_le_bytes(initial_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); - let (res, code_buffer) = sender.read(code_buffer.clone()).await; - let _ = res?; + let code_buffer = sender.read(code_buffer.clone()).await?; let code_buffer = code_buffer.freeze(); let code: u32 = u32::from_le_bytes(code_buffer[0..INITIAL_BYTES_LENGTH].try_into().unwrap()); diff --git a/core/server/src/tcp/sender.rs b/core/server/src/tcp/sender.rs index 2c0ecfa9..f757a69b 100644 --- a/core/server/src/tcp/sender.rs +++ b/core/server/src/tcp/sender.rs @@ -23,31 +23,25 @@ use compio::{ io::{AsyncRead, AsyncReadAtExt, AsyncReadExt, AsyncWriteExt}, }; use iggy_common::IggyError; -use nix::libc; -use std::io::IoSlice; use tracing::{debug, error}; use crate::streaming::utils::PooledBuffer; const STATUS_OK: &[u8] = &[0; 4]; -pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> (Result<usize, IggyError>, B) +pub(crate) async fn read<T, B>(stream: &mut T, buffer: B) -> Result<B, IggyError> where T: AsyncReadExt + AsyncWriteExt + Unpin, B: IoBufMut, { let BufResult(result, buffer) = stream.read_exact(buffer).await; - match (result, buffer) { - (Ok(_), buffer) => (Ok(buffer.buf_len()), buffer), - // TODO: How to handle this ?(Ok(0), buffer) => (Err(IggyError::ConnectionClosed), buffer), - // `read_exact` from compio doesn't return how many bytes it read. - (Err(error), buffer) => { + match result { + Ok(_) => Ok(buffer), + Err(error) => { if error.kind() == std::io::ErrorKind::UnexpectedEof { - //error!("Got some error tho.. {}", error); - (Err(IggyError::ConnectionClosed), buffer) + Err(IggyError::ConnectionClosed) } else { - //error!("Got some other error tho.. {}", error); - (Err(IggyError::TcpError), buffer) + Err(IggyError::TcpError) } } } diff --git a/core/server/src/tcp/tcp_sender.rs b/core/server/src/tcp/tcp_sender.rs index d0a1c27c..ccdc95d2 100644 --- a/core/server/src/tcp/tcp_sender.rs +++ b/core/server/src/tcp/tcp_sender.rs @@ -20,13 +20,11 @@ use crate::binary::sender::Sender; use crate::streaming::utils::PooledBuffer; use crate::tcp::COMPONENT; use crate::{server_error::ServerError, tcp::sender}; -use bytes::BytesMut; -use compio::buf::{IoBuf, IoBufMut}; +use compio::buf::IoBufMut; use compio::io::AsyncWrite; use compio::net::TcpStream; use error_set::ErrContext; use iggy_common::IggyError; -use nix::libc; #[derive(Debug)] pub struct TcpSender { @@ -34,7 +32,7 @@ pub struct TcpSender { } impl Sender for TcpSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { sender::read(&mut self.stream, buffer).await } diff --git a/core/server/src/tcp/tcp_tls_sender.rs b/core/server/src/tcp/tcp_tls_sender.rs index 8dbb3972..f0dfcdf4 100644 --- a/core/server/src/tcp/tcp_tls_sender.rs +++ b/core/server/src/tcp/tcp_tls_sender.rs @@ -35,7 +35,7 @@ pub struct TcpTlsSender { } impl Sender for TcpTlsSender { - async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<usize, IggyError>, B) { + async fn read<B: IoBufMut>(&mut self, buffer: B) -> Result<B, IggyError> { todo!(); sender::read(&mut self.stream, buffer).await }
