This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch rebase_master in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9a634fd46639cd63a3fa860def6df7c5a49b2643 Author: Hubert Gruszecki <[email protected]> AuthorDate: Sat Jul 5 19:25:59 2025 +0200 fix(io_uring): fix buffer reading with memory pool enabled (#1964) Replace unsafe set_len() with slice() for reading exact buffer sizes. This fixes hangs when memory pool returns larger buffers than requested. Following compio's recommended pattern: buffer.slice(0..exact_size). --- .../handlers/messages/send_messages_handler.rs | 29 +++++++++++----------- core/server/src/streaming/utils/memory_pool.rs | 15 +++-------- core/server/src/streaming/utils/pooled_buffer.rs | 2 +- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 47df8270..bb536c61 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -27,7 +27,7 @@ use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::utils::PooledBuffer; use anyhow::Result; -use bytes::BytesMut; +use compio::buf::{IntoInner as _, IoBuf}; use iggy_common::Identifier; use iggy_common::Sizeable; use iggy_common::{INDEX_SIZE, IdKind}; @@ -57,17 +57,18 @@ impl ServerCommandHandler for SendMessages { let total_payload_size = length as usize - std::mem::size_of::<u32>(); let metadata_len_field_size = std::mem::size_of::<u32>(); - let mut metadata_length_buffer = BytesMut::with_capacity(4); - unsafe { metadata_length_buffer.set_len(4) }; - let (result, metadata_len_buf) = sender.read(metadata_length_buffer).await; + let metadata_length_buffer = PooledBuffer::with_capacity(4); + let (result, metadata_len_buf) = sender.read(metadata_length_buffer.slice(0..4)).await; + let metadata_len_buf = metadata_len_buf.into_inner(); result?; - let metadata_len_buf = metadata_len_buf.freeze(); let metadata_size = u32::from_le_bytes(metadata_len_buf[..].try_into().unwrap()); - let mut metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); - unsafe { metadata_buffer.set_len(metadata_size as usize) }; - let (result, metadata_buf) = sender.read(metadata_buffer).await; + let metadata_buffer = PooledBuffer::with_capacity(metadata_size as usize); + let (result, metadata_buf) = sender + .read(metadata_buffer.slice(0..metadata_size as usize)) + .await; result?; + let metadata_buf = metadata_buf.into_inner(); let mut element_size = 0; @@ -90,17 +91,17 @@ impl ServerCommandHandler for SendMessages { ); let indexes_size = messages_count as usize * INDEX_SIZE; - let mut indexes_buffer = PooledBuffer::with_capacity(indexes_size); - unsafe { indexes_buffer.set_len(indexes_size) }; - let (result, indexes_buffer) = sender.read(indexes_buffer).await; + let indexes_buffer = PooledBuffer::with_capacity(indexes_size); + let (result, indexes_buffer) = sender.read(indexes_buffer.slice(0..indexes_size)).await; result?; + let indexes_buffer = indexes_buffer.into_inner(); let messages_size = total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; - let mut messages_buffer = PooledBuffer::with_capacity(messages_size); - unsafe { messages_buffer.set_len(messages_size) }; - let (result, messages_buffer) = sender.read(messages_buffer).await; + let messages_buffer = PooledBuffer::with_capacity(messages_size); + let (result, messages_buffer) = sender.read(messages_buffer.slice(0..messages_size)).await; result?; + let messages_buffer = messages_buffer.into_inner(); let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); let batch = IggyMessagesBatchMut::from_indexes_and_messages( diff --git a/core/server/src/streaming/utils/memory_pool.rs b/core/server/src/streaming/utils/memory_pool.rs index 16ad2087..675c0076 100644 --- a/core/server/src/streaming/utils/memory_pool.rs +++ b/core/server/src/streaming/utils/memory_pool.rs @@ -155,21 +155,12 @@ impl MemoryPool { /// Initialize the global pool from the given config. pub fn init_pool(config: Arc<SystemConfig>) { - // TODO: Fixme (stary napraw). - let is_enabled = false; + let is_enabled = config.memory_pool.enabled; let memory_limit = config.memory_pool.size.as_bytes_usize(); let bucket_capacity = config.memory_pool.bucket_capacity as usize; - let pool = MemoryPool::new(is_enabled, memory_limit, bucket_capacity); - if MEMORY_POOL.set(pool).is_err() { - warn!("Memory pool already initialized."); - // This shouldn't ever happen in production code, only in tests - // if someone forgets to add #[serial] tag to tests that have different - // memory pool limits (different instances are created within same executable). - if memory_pool().memory_limit != memory_limit { - panic!("Previously initialized memory pool has a different limit."); - } - } + let _ = + MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, memory_limit, bucket_capacity)); } /// Acquire a `BytesMut` buffer with at least `capacity` bytes. diff --git a/core/server/src/streaming/utils/pooled_buffer.rs b/core/server/src/streaming/utils/pooled_buffer.rs index 39ef25be..6bdaa357 100644 --- a/core/server/src/streaming/utils/pooled_buffer.rs +++ b/core/server/src/streaming/utils/pooled_buffer.rs @@ -229,7 +229,7 @@ impl SetBufInit for PooledBuffer { unsafe impl IoBufMut for PooledBuffer { fn as_buf_mut_ptr(&mut self) -> *mut u8 { - self.inner.as_buf_mut_ptr() + self.inner.as_mut_ptr() } }
