This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching-rebase-3 in repository https://gitbox.apache.org/repos/asf/iggy.git
commit b8bdbc98ca4f134470ab1571014b1083f00c96c0 Author: Hubert Gruszecki <h.grusze...@gmail.com> AuthorDate: Sat Apr 12 17:38:04 2025 +0200 implement BytesMutPool --- Cargo.lock | 24 ++ sdk/src/http/messages.rs | 1 + sdk/src/identifier.rs | 5 + sdk/src/messages/partitioning.rs | 5 + sdk/src/messages/send_messages.rs | 38 ++- sdk/src/models/messaging/messages_batch.rs | 163 +--------- server/Cargo.toml | 2 + server/src/binary/command.rs | 2 - .../handlers/messages/send_messages_handler.rs | 72 +++-- server/src/channels/commands/print_sysinfo.rs | 7 +- server/src/http/jwt/storage.rs | 7 +- server/src/http/messages.rs | 6 +- server/src/main.rs | 3 + .../src/streaming/segments/indexes/index_reader.rs | 24 +- .../src/streaming/segments/indexes/indexes_mut.rs | 52 +++- .../streaming/segments/messages/messages_reader.rs | 39 +-- .../src/streaming/segments/messages_accumulator.rs | 11 +- server/src/streaming/segments/reading_messages.rs | 11 +- server/src/streaming/segments/segment.rs | 2 +- .../streaming/segments/types/messages_batch_mut.rs | 336 +++++++++++++++++---- .../streaming/segments/types/messages_batch_set.rs | 84 +++--- server/src/streaming/segments/writing_messages.rs | 12 +- server/src/streaming/systems/messages.rs | 14 +- server/src/streaming/systems/storage.rs | 10 +- server/src/streaming/utils/bytes_mut_pool.rs | 313 +++++++++++++++++++ server/src/streaming/utils/mod.rs | 1 + 26 files changed, 869 insertions(+), 375 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa08280f..432e33c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1231,6 +1231,19 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.14" @@ -1259,6 +1272,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -4870,6 +4892,7 @@ dependencies = [ "chrono", "clap", "console-subscriber", + "crossbeam", "dashmap", "derive_more", "dotenvy", @@ -4887,6 +4910,7 @@ dependencies = [ "mockall", "moka", "nix", + "once_cell", "openssl", "opentelemetry", "opentelemetry-appender-tracing", diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs index 22fd3ede..4de7bec7 100644 --- a/sdk/src/http/messages.rs +++ b/sdk/src/http/messages.rs @@ -71,6 +71,7 @@ impl MessageClient for HttpClient { self.post( &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), &SendMessages { + metadata_length: 0, // this field is used only for TCP/QUIC stream_id: stream_id.clone(), topic_id: topic_id.clone(), partitioning: partitioning.clone(), diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs index e66d0059..cd65e318 100644 --- a/sdk/src/identifier.rs +++ b/sdk/src/identifier.rs @@ -200,6 +200,11 @@ impl Identifier { identifier.validate()?; Ok(identifier) } + + /// Maximum size of the Identifier struct + pub const fn maximum_byte_size() -> usize { + 2 + 255 + } } impl Sizeable for Identifier { diff --git a/sdk/src/messages/partitioning.rs b/sdk/src/messages/partitioning.rs index 8e454b38..f41b8814 100644 --- a/sdk/src/messages/partitioning.rs +++ b/sdk/src/messages/partitioning.rs @@ -155,6 +155,11 @@ impl Partitioning { value, }) } + + /// Maximum size of the Partitioning struct + pub const fn maximum_byte_size() -> usize { + 2 + 255 + } } impl Hash for Partitioning { diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 22a219b9..e2952c6e 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -43,6 +43,8 @@ use super::{Partitioning, PartitioningKind}; /// - `batch` - collection of messages to be sent. #[derive(Debug, PartialEq)] pub struct SendMessages { + /// Length of stream_id, topic_id, partitioning and messages_count (4 bytes) + pub metadata_length: u32, /// Unique stream ID (numeric or name). pub stream_id: Identifier, /// Unique topic ID (numeric or name). @@ -60,25 +62,33 @@ impl SendMessages { partitioning: &Partitioning, messages: &[IggyMessage], ) -> Bytes { - let stream_id_size = stream_id.get_buffer_size(); - let topic_id_size = topic_id.get_buffer_size(); - let partitioning_size = partitioning.get_buffer_size(); + let stream_id_field_size = stream_id.get_buffer_size(); + let topic_id_field_size = topic_id.get_buffer_size(); + let partitioning_field_size = partitioning.get_buffer_size(); + let metadata_length_field_size = std::mem::size_of::<u32>(); let messages_count = messages.len(); - let messages_count_size = std::mem::size_of::<u32>(); + let messages_count_field_size = std::mem::size_of::<u32>(); + let metadata_length = stream_id_field_size + + topic_id_field_size + + partitioning_field_size + + messages_count_field_size; let indexes_size = messages_count * INDEX_SIZE; - - let total_size = stream_id_size - + topic_id_size - + partitioning_size - + messages_count_size + let messages_size = messages + .iter() + .map(|m| m.get_size_bytes().as_bytes_usize()) + .sum::<usize>(); + + let total_size = metadata_length_field_size + + stream_id_field_size + + topic_id_field_size + + partitioning_field_size + + messages_count_field_size + indexes_size - + messages - .iter() - .map(|m| m.get_size_bytes().as_bytes_usize()) - .sum::<usize>(); + + messages_size; let mut bytes = BytesMut::with_capacity(total_size); + bytes.put_u32_le(metadata_length as u32); stream_id.write_to_buffer(&mut bytes); topic_id.write_to_buffer(&mut bytes); partitioning.write_to_buffer(&mut bytes); @@ -121,6 +131,7 @@ fn write_value_at<const N: usize>(slice: &mut [u8], value: [u8; N], position: us impl Default for SendMessages { fn default() -> Self { SendMessages { + metadata_length: 0, stream_id: Identifier::default(), topic_id: Identifier::default(), partitioning: Partitioning::default(), @@ -327,6 +338,7 @@ impl<'de> Deserialize<'de> for SendMessages { let batch = IggyMessagesBatch::from(&messages); Ok(SendMessages { + metadata_length: 0, // this field is used only for TCP/QUIC stream_id: Identifier::default(), topic_id: Identifier::default(), partitioning, diff --git a/sdk/src/models/messaging/messages_batch.rs b/sdk/src/models/messaging/messages_batch.rs index 31851176..911ff723 100644 --- a/sdk/src/models/messaging/messages_batch.rs +++ b/sdk/src/models/messaging/messages_batch.rs @@ -156,116 +156,6 @@ impl IggyMessagesBatch { } } - /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages - /// whose message headers have an offset greater than or equal to the provided `start_offset`. - pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> Option<Self> { - if self.is_empty() || count == 0 { - return None; - } - - let first_offset = self.first_offset()?; - - if start_offset < first_offset { - return self.slice_by_index(0, count); - } - - let last_offset = self.last_offset()?; - if start_offset > last_offset { - return None; - } - - let offset_diff = start_offset - first_offset; - let first_message_index = offset_diff as usize; - - if first_message_index >= self.count() as usize { - return None; - } - - self.slice_by_index(first_message_index as u32, count) - } - - /// Helper method to slice the batch starting from a specific index - fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> { - if start_index >= self.count() { - return None; - } - - let last_message_index = - std::cmp::min((start_index + count) as usize, self.count() as usize); - - let sub_indexes = self.indexes.slice_by_offset( - start_index, - (last_message_index - start_index as usize) as u32, - )?; - - let first_message_position = self.message_start_position(start_index as usize); - let last_message_position = self.message_end_position(last_message_index - 1); - - let sub_buffer = self - .messages - .slice(first_message_position..last_message_position); - - Some(IggyMessagesBatch { - count: (last_message_index - start_index as usize) as u32, - indexes: sub_indexes, - messages: sub_buffer, - }) - } - - /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages - /// whose message headers have a timestamp greater than or equal to the provided `timestamp`. - /// - /// If no messages meet the criteria, returns `None`. - pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> Option<Self> { - if self.is_empty() || count == 0 { - return None; - } - - // Use binary search to find the first message with timestamp >= the target - let first_message_index = self.binary_search_timestamp(timestamp)?; - - self.slice_by_index(first_message_index, count) - } - - /// Find the position of the index with timestamp closest to (but not exceeding) the target - fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> { - if self.count() == 0 { - return None; - } - - let last_timestamp = self.get(self.count() as usize - 1)?.header().timestamp(); - if target_timestamp > last_timestamp { - return Some(self.count() - 1); - } - - let first_timestamp = self.get(0)?.header().timestamp(); - if target_timestamp <= first_timestamp { - return Some(0); - } - - let mut low = 0; - let mut high = self.count() - 1; - - while low <= high { - let mid = low + (high - low) / 2; - let mid_index = self.get(mid as usize)?; - let mid_timestamp = mid_index.header().timestamp(); - - match mid_timestamp.cmp(&target_timestamp) { - std::cmp::Ordering::Equal => return Some(mid), - std::cmp::Ordering::Less => low = mid + 1, - std::cmp::Ordering::Greater => { - if mid == 0 { - break; - } - high = mid - 1; - } - } - } - - Some(low) - } - /// Get the message at the specified index. /// Returns None if the index is out of bounds. pub fn get(&self, index: usize) -> Option<IggyMessageView> { @@ -294,58 +184,6 @@ impl IggyMessagesBatch { result } - - /// Validates that all messages in batch have correct checksums. - pub fn validate_checksums(&self) -> Result<(), IggyError> { - for message in self.iter() { - let calculated_checksum = message.calculate_checksum(); - let actual_checksum = message.header().checksum(); - let offset = message.header().offset(); - if calculated_checksum != actual_checksum { - return Err(IggyError::InvalidMessageChecksum( - actual_checksum, - calculated_checksum, - offset, - )); - } - } - Ok(()) - } - - /// Validates that all messages have correct checksums and offsets. - /// This function should be called after messages have been read from disk. - /// - /// # Arguments - /// - /// * `absolute_start_offset` - The absolute offset of the first message in the batch. - /// - /// # Returns - /// - /// * `Ok(())` - If all messages have correct checksums and offsets. - /// * `Err(IggyError)` - If any message has an invalid checksum or offset. - pub fn validate_checksums_and_offsets( - &self, - absolute_start_offset: u64, - ) -> Result<(), IggyError> { - let mut current_offset = absolute_start_offset; - for message in self.iter() { - let calculated_checksum = message.calculate_checksum(); - let actual_checksum = message.header().checksum(); - let offset = message.header().offset(); - if offset != current_offset { - return Err(IggyError::InvalidOffset(offset)); - } - if calculated_checksum != actual_checksum { - return Err(IggyError::InvalidMessageChecksum( - actual_checksum, - calculated_checksum, - offset, - )); - } - current_offset += 1; - } - Ok(()) - } } impl IntoIterator for IggyMessagesBatch { @@ -375,6 +213,7 @@ impl Index<usize> for IggyMessagesBatch { &self.messages[start..end] } } + impl BytesSerializable for IggyMessagesBatch { fn to_bytes(&self) -> Bytes { panic!("should not be used"); diff --git a/server/Cargo.toml b/server/Cargo.toml index d4483c5e..4b1ec236 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -56,6 +56,7 @@ bytes = "1.10.1" chrono = "0.4.40" clap = { version = "4.5.34", features = ["derive"] } console-subscriber = { version = "0.4.1", optional = true } +crossbeam = "0.8.4" dashmap = "6.1.0" derive_more = "2.0.1" dotenvy = { version = "0.15.7" } @@ -72,6 +73,7 @@ lending-iterator = "0.1.7" mimalloc = { version = "0.1", optional = true } moka = { version = "0.12.10", features = ["future"] } nix = { version = "0.29", features = ["fs"] } +once_cell = "1.21.3" openssl = { version = "0.10.71", features = ["vendored"] } opentelemetry = { version = "0.29.0", features = ["trace", "logs"] } opentelemetry-appender-tracing = { version = "0.29.1", features = ["log"] } diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs index 77e9c8a5..f138467f 100644 --- a/server/src/binary/command.rs +++ b/server/src/binary/command.rs @@ -69,8 +69,6 @@ use iggy::users::update_user::UpdateUser; use strum::EnumString; use tracing::error; -// todo add missing commands - define_server_command_enum! { Ping(Ping), PING_CODE, PING, false; GetStats(GetStats), GET_STATS_CODE, GET_STATS, false; diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs index 1c1bc2b9..a043e736 100644 --- a/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/server/src/binary/handlers/messages/send_messages_handler.rs @@ -21,8 +21,8 @@ use crate::binary::sender::SenderKind; use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; +use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL}; use anyhow::Result; -use bytes::{Buf, BytesMut}; use iggy::error::IggyError; use iggy::identifier::Identifier; use iggy::models::messaging::INDEX_SIZE; @@ -49,40 +49,56 @@ impl ServerCommandHandler for SendMessages { session: &Session, system: &SharedSystem, ) -> Result<(), IggyError> { - let length = length - 4; - let mut buffer = BytesMut::with_capacity(length as usize); - unsafe { - buffer.set_len(length as usize); - } - sender.read(&mut buffer).await?; - - let mut element_size; - - let stream_id = Identifier::from_raw_bytes(buffer.chunk())?; - element_size = stream_id.get_size_bytes().as_bytes_usize(); - buffer.advance(element_size); - self.stream_id = stream_id; + let total_payload_size = length as usize - std::mem::size_of::<u32>(); + let metadata_len_field_size = std::mem::size_of::<u32>(); - let topic_id = Identifier::from_raw_bytes(buffer.chunk())?; - element_size = topic_id.get_size_bytes().as_bytes_usize(); - buffer.advance(element_size); - self.topic_id = topic_id; + let mut metadata_length_buffer = [0u8; 4]; + sender.read(&mut metadata_length_buffer).await?; + let metadata_size = u32::from_le_bytes(metadata_length_buffer); - let partitioning = Partitioning::from_raw_bytes(buffer.chunk())?; - element_size = partitioning.get_size_bytes().as_bytes_usize(); - buffer.advance(element_size); - self.partitioning = partitioning; + let mut metadata_buffer = BYTES_MUT_POOL.get_buffer(metadata_size as usize); + unsafe { metadata_buffer.set_len(metadata_size as usize) }; + sender.read(&mut metadata_buffer).await?; - let messages_count = buffer.get_u32_le() as usize; + let mut element_size = 0; - let mut indexes_and_messages = buffer.split(); + let stream_id = Identifier::from_raw_bytes(&metadata_buffer)?; + element_size += stream_id.get_size_bytes().as_bytes_usize(); + self.stream_id = stream_id; - let messages = indexes_and_messages.split_off(messages_count * INDEX_SIZE); - let indexes = indexes_and_messages; + let topic_id = Identifier::from_raw_bytes(&metadata_buffer[element_size..])?; + element_size += topic_id.get_size_bytes().as_bytes_usize(); + self.topic_id = topic_id; - let indexes = IggyIndexesMut::from_bytes(indexes); + let partitioning = Partitioning::from_raw_bytes(&metadata_buffer[element_size..])?; + element_size += partitioning.get_size_bytes().as_bytes_usize(); + self.partitioning = partitioning; - let batch = IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages); + let messages_count = u32::from_le_bytes( + metadata_buffer[element_size..element_size + 4] + .try_into() + .unwrap(), + ); + let indexes_size = messages_count as usize * INDEX_SIZE; + + metadata_buffer.return_to_pool(); + + let mut indexes_buffer = BYTES_MUT_POOL.get_buffer(indexes_size); + unsafe { indexes_buffer.set_len(indexes_size) }; + sender.read(&mut indexes_buffer).await?; + + let messages_size = + total_payload_size - metadata_size as usize - indexes_size - metadata_len_field_size; + let mut messages_buffer = BYTES_MUT_POOL.get_buffer(messages_size); + unsafe { messages_buffer.set_len(messages_size) }; + sender.read(&mut messages_buffer).await?; + + let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0); + let batch = IggyMessagesBatchMut::from_indexes_and_messages( + messages_count, + indexes, + messages_buffer, + ); batch.validate()?; diff --git a/server/src/channels/commands/print_sysinfo.rs b/server/src/channels/commands/print_sysinfo.rs index f40bb4e0..c32c7229 100644 --- a/server/src/channels/commands/print_sysinfo.rs +++ b/server/src/channels/commands/print_sysinfo.rs @@ -17,8 +17,9 @@ */ use crate::{ - channels::server_command::BackgroundServerCommand, configs::server::ServerConfig, - streaming::systems::system::SharedSystem, + channels::server_command::BackgroundServerCommand, + configs::server::ServerConfig, + streaming::{systems::system::SharedSystem, utils::bytes_mut_pool::BYTES_MUT_POOL}, }; use flume::{Receiver, Sender}; use human_repr::HumanCount; @@ -89,6 +90,8 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for SysInfoPrintExecutor { stats.read_bytes, stats.written_bytes, stats.run_time); + + BYTES_MUT_POOL.log_stats(); } fn start_command_sender( diff --git a/server/src/http/jwt/storage.rs b/server/src/http/jwt/storage.rs index 05389a9c..30884c09 100644 --- a/server/src/http/jwt/storage.rs +++ b/server/src/http/jwt/storage.rs @@ -17,13 +17,14 @@ */ use crate::http::jwt::COMPONENT; +use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL}; use crate::streaming::utils::file; use crate::{ http::jwt::json_web_token::RevokedAccessToken, streaming::persistence::persister::PersisterKind, }; use ahash::AHashMap; use anyhow::Context; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use error_set::ErrContext; use iggy::error::IggyError; use std::sync::Arc; @@ -69,7 +70,7 @@ impl TokenStorage { }) .map_err(|_| IggyError::CannotReadFileMetadata)? .len() as usize; - let mut buffer = BytesMut::with_capacity(file_size); + let mut buffer = BYTES_MUT_POOL.get_buffer(file_size); buffer.put_bytes(0, file_size); file.read_exact(&mut buffer) .await @@ -87,6 +88,8 @@ impl TokenStorage { .map_err(|_| IggyError::CannotDeserializeResource)? .0; + buffer.return_to_pool(); + let tokens = tokens .into_iter() .map(|(id, expiry)| RevokedAccessToken { id, expiry }) diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 17dc5ad0..1decefdb 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -143,6 +143,8 @@ fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { let (_, indexes, messages) = batch.decompose(); let (_, indexes_buffer) = indexes.decompose(); let indexes_buffer_mut = BytesMut::from(indexes_buffer); - let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut); - IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages.into()) + let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0); + let count = indexes_mut.count(); + let messages_mut = messages.into(); + IggyMessagesBatchMut::from_indexes_and_messages(count, indexes_mut, messages_mut) } diff --git a/server/src/main.rs b/server/src/main.rs index 7eac061e..e9a81962 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -38,6 +38,7 @@ use server::log::tokio_console::Logging; use server::quic::quic_server; use server::server_error::ServerError; use server::streaming::systems::system::{SharedSystem, System}; +use server::streaming::utils::bytes_mut_pool::BytesMutPool; use server::tcp::tcp_server; use tokio::time::Instant; use tracing::{info, instrument}; @@ -94,6 +95,8 @@ async fn main() -> Result<(), ServerError> { #[cfg(not(feature = "disable-mimalloc"))] info!("Using mimalloc allocator"); + BytesMutPool::init_pool(); + let system = SharedSystem::new(System::new( config.system.clone(), config.data_maintenance.clone(), diff --git a/server/src/streaming/segments/indexes/index_reader.rs b/server/src/streaming/segments/indexes/index_reader.rs index 55fc0ac5..cf0d0438 100644 --- a/server/src/streaming/segments/indexes/index_reader.rs +++ b/server/src/streaming/segments/indexes/index_reader.rs @@ -17,9 +17,10 @@ */ use super::IggyIndexesMut; +use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL}; use bytes::BytesMut; use error_set::ErrContext; -use iggy::models::messaging::{IggyIndex, IggyIndexes, INDEX_SIZE}; +use iggy::models::messaging::{IggyIndex, INDEX_SIZE}; use iggy::{error::IggyError, models::messaging::IggyIndexView}; use std::{ fs::File as StdFile, @@ -102,7 +103,7 @@ impl IndexReader { } }; let index_count = file_size / INDEX_SIZE as u32; - let indexes = IggyIndexesMut::from_bytes(buf); + let indexes = IggyIndexesMut::from_bytes(buf, index_count); if indexes.count() != index_count { error!( "Loaded {} indexes from disk, expected {}, file {} is probably corrupted!", @@ -123,7 +124,7 @@ impl IndexReader { &self, relative_start_offset: u32, count: u32, - ) -> Result<Option<IggyIndexes>, IggyError> { + ) -> Result<Option<IggyIndexesMut>, IggyError> { let file_size = self.file_size(); let total_indexes = file_size / INDEX_SIZE as u32; @@ -198,8 +199,8 @@ impl IndexReader { base_position ); - Ok(Some(IggyIndexes::new( - indexes_bytes.freeze(), + Ok(Some(IggyIndexesMut::from_bytes( + indexes_bytes, base_position, ))) } @@ -212,7 +213,7 @@ impl IndexReader { &self, timestamp: u64, count: u32, - ) -> Result<Option<IggyIndexes>, IggyError> { + ) -> Result<Option<IggyIndexesMut>, IggyError> { let file_size = self.file_size(); let total_indexes = file_size / INDEX_SIZE as u32; @@ -281,8 +282,8 @@ impl IndexReader { base_position ); - Ok(Some(IggyIndexes::new( - indexes_bytes.freeze(), + Ok(Some(IggyIndexesMut::from_bytes( + indexes_bytes, base_position, ))) } @@ -354,7 +355,7 @@ impl IndexReader { async fn read_at(&self, offset: u32, len: u32) -> Result<BytesMut, std::io::Error> { let file = self.file.clone(); spawn_blocking(move || { - let mut buf = BytesMut::with_capacity(len as usize); + let mut buf = BYTES_MUT_POOL.get_buffer(len as usize); unsafe { buf.set_len(len as usize) }; file.read_exact_at(&mut buf, offset as u64)?; Ok(buf) @@ -395,6 +396,9 @@ impl IndexReader { } }; - Ok(Some(IggyIndexView::new(&buf).to_index())) + let index = IggyIndexView::new(&buf).to_index(); + buf.return_to_pool(); + + Ok(Some(index)) } } diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs b/server/src/streaming/segments/indexes/indexes_mut.rs index 0c16cb0f..da89a334 100644 --- a/server/src/streaming/segments/indexes/indexes_mut.rs +++ b/server/src/streaming/segments/indexes/indexes_mut.rs @@ -16,11 +16,13 @@ * under the License. */ -use bytes::{BufMut, Bytes, BytesMut}; -use iggy::models::messaging::{IggyIndexView, IggyIndexes, INDEX_SIZE}; +use bytes::{BufMut, BytesMut}; +use iggy::models::messaging::{IggyIndexView, INDEX_SIZE}; use std::fmt; use std::ops::{Deref, Index as StdIndex}; +use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL}; + /// A container for binary-encoded index data. /// Optimized for efficient storage and I/O operations. #[derive(Default, Clone)] @@ -41,17 +43,24 @@ impl IggyIndexesMut { } /// Creates indexes from bytes - pub fn from_bytes(indexes: BytesMut) -> Self { + pub fn from_bytes(indexes: BytesMut, base_position: u32) -> Self { Self { buffer: indexes, saved_count: 0, - base_position: 0, + base_position, } } + /// Decompose the container into its components + pub fn decompose(mut self) -> (u32, BytesMut) { + let base_position = self.base_position; + let buffer = std::mem::replace(&mut self.buffer, BytesMut::new()); + (base_position, buffer) + } + /// Gets the size of all indexes messages pub fn messages_size(&self) -> u32 { - self.last_position() + self.last_position() - self.base_position } /// Gets the base position of the indexes @@ -74,15 +83,15 @@ impl IggyIndexesMut { /// Creates a new container with the specified capacity pub fn with_capacity(capacity: usize, base_position: u32) -> Self { Self { - buffer: BytesMut::with_capacity(capacity * INDEX_SIZE), + buffer: BYTES_MUT_POOL.get_buffer(capacity * INDEX_SIZE), saved_count: 0, base_position, } } - /// Makes the indexes immutable - pub fn make_immutable(self) -> IggyIndexes { - IggyIndexes::new(self.buffer.freeze(), self.base_position) + /// Gets the capacity of the buffer + pub fn capacity(&self) -> usize { + self.buffer.capacity() } /// Inserts a new index at the end of buffer @@ -93,8 +102,8 @@ impl IggyIndexesMut { } /// Appends another slice of indexes to this one. - pub fn concatenate(&mut self, other: Bytes) { - self.buffer.put(other); + pub fn append_slice(&mut self, other: &[u8]) { + self.buffer.put_slice(other); } /// Gets the number of indexes in the container @@ -225,7 +234,11 @@ impl IggyIndexesMut { } /// Slices the container to return a view of a specific range of indexes - pub fn slice_by_offset(&self, relative_start_offset: u32, count: u32) -> Option<IggyIndexes> { + pub fn slice_by_offset( + &self, + relative_start_offset: u32, + count: u32, + ) -> Option<IggyIndexesMut> { let available_count = self.count().saturating_sub(relative_start_offset); let actual_count = std::cmp::min(count, available_count); @@ -240,15 +253,15 @@ impl IggyIndexesMut { let slice = BytesMut::from(&self.buffer[start_byte..end_byte]); if relative_start_offset == 0 { - Some(IggyIndexes::new(slice.freeze(), self.base_position)) + Some(IggyIndexesMut::from_bytes(slice, self.base_position)) } else { let position_offset = self.get(relative_start_offset - 1).unwrap().position(); - Some(IggyIndexes::new(slice.freeze(), position_offset)) + Some(IggyIndexesMut::from_bytes(slice, position_offset)) } } /// Loads indexes from cache based on timestamp - pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> Option<IggyIndexes> { + pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> Option<IggyIndexesMut> { if self.count() == 0 { return None; } @@ -274,7 +287,7 @@ impl IggyIndexesMut { 0 }; - Some(IggyIndexes::new(slice.freeze(), base_position)) + Some(IggyIndexesMut::from_bytes(slice, base_position)) } /// Find the position of the index with timestamp closest to (but not exceeding) the target @@ -317,6 +330,13 @@ impl IggyIndexesMut { } } +impl Drop for IggyIndexesMut { + fn drop(&mut self) { + let indexes = std::mem::replace(&mut self.buffer, BytesMut::new()); + indexes.return_to_pool(); + } +} + impl StdIndex<usize> for IggyIndexesMut { type Output = [u8]; diff --git a/server/src/streaming/segments/messages/messages_reader.rs b/server/src/streaming/segments/messages/messages_reader.rs index fbff0fb0..c1d0f99b 100644 --- a/server/src/streaming/segments/messages/messages_reader.rs +++ b/server/src/streaming/segments/messages/messages_reader.rs @@ -16,12 +16,11 @@ * under the License. */ -use bytes::{Bytes, BytesMut}; +use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; +use crate::streaming::utils::bytes_mut_pool::BYTES_MUT_POOL; +use bytes::BytesMut; use error_set::ErrContext; -use iggy::{ - error::IggyError, - models::messaging::{IggyIndexes, IggyMessagesBatch}, -}; +use iggy::error::IggyError; use std::{fs::File as StdFile, os::unix::prelude::FileExt}; use std::{ io::ErrorKind, @@ -90,12 +89,11 @@ impl MessagesReader { /// Loads and returns all message IDs from the messages file. pub async fn load_all_message_ids_from_disk( &self, - indexes: IggyIndexes, + indexes: IggyIndexesMut, messages_count: u32, ) -> Result<Vec<u128>, IggyError> { let file_size = self.file_size(); if file_size == 0 { - trace!("Messages file {} is empty.", self.file_path); return Ok(vec![]); } @@ -113,7 +111,11 @@ impl MessagesReader { } }; - let messages = IggyMessagesBatch::new(indexes, messages_bytes, messages_count); + let messages = IggyMessagesBatchMut::from_indexes_and_messages( + messages_count, + indexes, + messages_bytes, + ); let mut ids = Vec::with_capacity(messages_count as usize); for message in messages.iter() { @@ -126,12 +128,11 @@ impl MessagesReader { /// Loads and returns a batch of messages from the messages file. pub async fn load_messages_from_disk( &self, - indexes: IggyIndexes, - ) -> Result<IggyMessagesBatch, IggyError> { + indexes: IggyIndexesMut, + ) -> Result<IggyMessagesBatchMut, IggyError> { let file_size = self.file_size(); if file_size == 0 { - trace!("Messages file {} is empty.", self.file_path); - return Ok(IggyMessagesBatch::empty()); + return Ok(IggyMessagesBatchMut::empty()); } let start_pos = indexes.base_position(); @@ -139,13 +140,13 @@ impl MessagesReader { let messages_count = indexes.count(); if start_pos + count_bytes > file_size { - return Ok(IggyMessagesBatch::empty()); + return Ok(IggyMessagesBatchMut::empty()); } let messages_bytes = match self.read_at(start_pos as u64, count_bytes).await { Ok(buf) => buf, Err(error) if error.kind() == ErrorKind::UnexpectedEof => { - return Ok(IggyMessagesBatch::empty()); + return Ok(IggyMessagesBatchMut::empty()); } Err(error) => { error!( @@ -156,10 +157,10 @@ impl MessagesReader { } }; - Ok(IggyMessagesBatch::new( + Ok(IggyMessagesBatchMut::from_indexes_and_messages( + messages_count, indexes, messages_bytes, - messages_count, )) } @@ -169,13 +170,13 @@ impl MessagesReader { } /// Reads `len` bytes from the messages file at the specified `offset`. - async fn read_at(&self, offset: u64, len: u32) -> Result<Bytes, std::io::Error> { + async fn read_at(&self, offset: u64, len: u32) -> Result<BytesMut, std::io::Error> { let file = self.file.clone(); spawn_blocking(move || { - let mut buf = BytesMut::with_capacity(len as usize); + let mut buf = BYTES_MUT_POOL.get_buffer(len as usize); unsafe { buf.set_len(len as usize) }; file.read_exact_at(&mut buf, offset)?; - Ok(buf.freeze()) + Ok(buf) }) .await? } diff --git a/server/src/streaming/segments/messages_accumulator.rs b/server/src/streaming/segments/messages_accumulator.rs index a8a07663..fe259c27 100644 --- a/server/src/streaming/segments/messages_accumulator.rs +++ b/server/src/streaming/segments/messages_accumulator.rs @@ -66,11 +66,10 @@ impl MessagesAccumulator { segment_start_offset: u64, segment_current_offset: u64, segment_current_position: u32, - batch: IggyMessagesBatchMut, + mut batch: IggyMessagesBatchMut, deduplicator: Option<&MessageDeduplicator>, ) { let batch_messages_count = batch.count(); - if batch_messages_count == 0 { return; } @@ -85,7 +84,7 @@ impl MessagesAccumulator { self.initialize_or_update_offsets(segment_current_offset, segment_current_position); - let prepared_batch = batch + batch .prepare_for_persistence( segment_start_offset, self.current_offset, @@ -94,9 +93,9 @@ impl MessagesAccumulator { ) .await; - let batch_size = prepared_batch.size(); + let batch_size = batch.size(); - self.batches.add_batch(prepared_batch); + self.batches.add_batch(batch); self.messages_count += batch_messages_count; self.current_offset = self.base_offset + self.messages_count as u64 - 1; @@ -151,7 +150,7 @@ impl MessagesAccumulator { /// Checks if the accumulator is empty (has no messages). pub fn is_empty(&self) -> bool { - self.batches.is_empty() || self.messages_count == 0 + self.messages_count == 0 } /// Returns the number of messages in the accumulator that have not been persisted. diff --git a/server/src/streaming/segments/reading_messages.rs b/server/src/streaming/segments/reading_messages.rs index 34fbc44d..7ac79bf1 100644 --- a/server/src/streaming/segments/reading_messages.rs +++ b/server/src/streaming/segments/reading_messages.rs @@ -16,10 +16,9 @@ * under the License. */ -use super::IggyMessagesBatchSet; +use super::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; use crate::streaming::segments::segment::Segment; use error_set::ErrContext; -use iggy::models::messaging::{IggyIndexes, IggyMessagesBatch}; use iggy::prelude::*; use std::sync::atomic::Ordering; use tracing::trace; @@ -276,7 +275,7 @@ impl Segment { &self, relative_start_offset: u32, count: u32, - ) -> Result<Option<IggyIndexes>, IggyError> { + ) -> Result<Option<IggyIndexesMut>, IggyError> { let indexes = if !self.indexes.is_empty() { self.indexes.slice_by_offset(relative_start_offset, count) } else { @@ -293,7 +292,7 @@ impl Segment { &self, timestamp: u64, count: u32, - ) -> Result<Option<IggyIndexes>, IggyError> { + ) -> Result<Option<IggyIndexesMut>, IggyError> { let indexes = if !self.indexes.is_empty() { self.indexes.slice_by_timestamp(timestamp, count) } else { @@ -359,7 +358,7 @@ impl Segment { &self, timestamp: u64, count: u32, - ) -> Result<IggyMessagesBatch, IggyError> { + ) -> Result<IggyMessagesBatchMut, IggyError> { tracing::trace!( "Loading {count} messages from disk, timestamp: {timestamp}, current_timestamp: {}...", self.end_timestamp @@ -368,7 +367,7 @@ impl Segment { let indexes_to_read = self.load_indexes_by_timestamp(timestamp, count).await?; if indexes_to_read.is_none() { - return Ok(IggyMessagesBatch::empty()); + return Ok(IggyMessagesBatchMut::empty()); } let indexes_to_read = indexes_to_read.unwrap(); diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 026834ae..fcb5bb90 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -100,7 +100,7 @@ impl Segment { last_index_position: 0, max_size_bytes: config.segment.size, message_expiry, - indexes: IggyIndexesMut::with_capacity(1_000_000, 0), + indexes: IggyIndexesMut::with_capacity(1024 * 1024, 0), accumulator: MessagesAccumulator::default(), is_closed: false, messages_writer: None, diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs b/server/src/streaming/segments/types/messages_batch_mut.rs index 60af072b..87c87c88 100644 --- a/server/src/streaming/segments/types/messages_batch_mut.rs +++ b/server/src/streaming/segments/types/messages_batch_mut.rs @@ -19,14 +19,15 @@ use super::message_view_mut::IggyMessageViewMutIterator; use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator; use crate::streaming::segments::indexes::IggyIndexesMut; +use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL}; use crate::streaming::utils::random_id; use bytes::{BufMut, BytesMut}; use iggy::messages::MAX_PAYLOAD_SIZE; -use iggy::models::messaging::{IggyIndexView, IggyMessagesBatch, INDEX_SIZE}; +use iggy::models::messaging::{IggyIndexView, INDEX_SIZE}; use iggy::prelude::*; use iggy::utils::timestamp::IggyTimestamp; use lending_iterator::prelude::*; -use std::ops::Deref; +use std::ops::{Deref, Index}; use tracing::{error, warn}; /// A container for mutable messages that are being prepared for persistence. @@ -35,6 +36,9 @@ use tracing::{error, warn}; /// and the corresponding index data that allows for efficient message lookup. #[derive(Debug, Default)] pub struct IggyMessagesBatchMut { + /// The number of messages in the batch + count: u32, + /// The index data for all messages in the buffer indexes: IggyIndexesMut, @@ -49,14 +53,32 @@ impl Sizeable for IggyMessagesBatchMut { } impl IggyMessagesBatchMut { + /// Creates a new empty messages container + pub fn empty() -> Self { + Self { + count: 0, + indexes: IggyIndexesMut::empty(), + messages: BytesMut::new(), + } + } + /// Creates a new messages container from existing index and message buffers. /// /// # Arguments /// /// * `indexes` - Preprocessed index data /// * `messages` - Serialized message data - pub fn from_indexes_and_messages(indexes: IggyIndexesMut, messages: BytesMut) -> Self { - Self { indexes, messages } + /// * `count` - Number of messages in the batch + pub fn from_indexes_and_messages( + count: u32, + indexes: IggyIndexesMut, + messages: BytesMut, + ) -> Self { + Self { + count, + indexes, + messages, + } } /// Creates a new messages container from a slice of IggyMessage objects. @@ -70,7 +92,7 @@ impl IggyMessagesBatchMut { /// * `messages` - Slice of message objects to store /// * `messages_size` - Total size of all messages in bytes pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self { - let mut messages_buffer = BytesMut::with_capacity(messages_size as usize); + let mut messages_buffer = BYTES_MUT_POOL.get_buffer(messages_size as usize); let mut indexes_buffer = IggyIndexesMut::with_capacity(messages.len(), 0); let mut position = 0; @@ -81,7 +103,7 @@ impl IggyMessagesBatchMut { indexes_buffer.insert(0, position, 0); } - Self::from_indexes_and_messages(indexes_buffer, messages_buffer) + Self::from_indexes_and_messages(messages.len() as u32, indexes_buffer, messages_buffer) } /// Creates a lending iterator that yields mutable views of messages. @@ -117,15 +139,15 @@ impl IggyMessagesBatchMut { /// /// An immutable `IggyMessagesBatch` ready for persistence pub async fn prepare_for_persistence( - self, + &mut self, start_offset: u64, base_offset: u64, current_position: u32, deduplicator: Option<&MessageDeduplicator>, - ) -> IggyMessagesBatch { + ) { let messages_count = self.count(); if messages_count == 0 { - return IggyMessagesBatch::empty(); + return; } let mut curr_abs_offset = base_offset; @@ -138,9 +160,10 @@ impl IggyMessagesBatchMut { let mut invalid_messages_indexes = deduplicator.map(|_| Vec::with_capacity(messages_count as usize)); - let (mut indexes, mut messages) = self.decompose(); - indexes.set_base_position(current_position); - let mut iter = IggyMessageViewMutIterator::new(&mut messages); + // let (mut indexes, mut messages) = self.decompose(); + self.indexes.set_base_position(current_position); + let mut iter: IggyMessageViewMutIterator<'_> = + IggyMessageViewMutIterator::new(&mut self.messages); let timestamp = IggyTimestamp::now().as_micros(); while let Some(mut message) = iter.next() { @@ -169,9 +192,9 @@ impl IggyMessagesBatchMut { curr_position += message_size; let relative_offset = (curr_abs_offset - start_offset) as u32; - indexes.set_offset_at(curr_rel_offset, relative_offset); - indexes.set_position_at(curr_rel_offset, curr_position); - indexes.set_timestamp_at(curr_rel_offset, timestamp); + self.indexes.set_offset_at(curr_rel_offset, relative_offset); + self.indexes.set_position_at(curr_rel_offset, curr_position); + self.indexes.set_timestamp_at(curr_rel_offset, timestamp); curr_abs_offset += 1; curr_rel_offset += 1; @@ -179,48 +202,53 @@ impl IggyMessagesBatchMut { if let Some(invalid_messages_indexes) = invalid_messages_indexes { if invalid_messages_indexes.is_empty() { - return IggyMessagesBatch::new( - indexes.make_immutable(), - messages.freeze(), - messages_count, - ); + return; } - let batch = IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages) - .remove_messages(&invalid_messages_indexes, current_position); - - let messages_count = batch.count(); - - let (indexes, messages) = batch.decompose(); - - return IggyMessagesBatch::new( - indexes.make_immutable(), - messages.freeze(), - messages_count, - ); + self.remove_messages(&invalid_messages_indexes, current_position); } + } - IggyMessagesBatch::new(indexes.make_immutable(), messages.freeze(), messages_count) + /// Returns the first offset in the batch + pub fn first_offset(&self) -> Option<u64> { + if self.is_empty() { + return None; + } + Some(IggyMessageView::new(&self.messages).header().offset()) } /// Returns the first timestamp in the batch - pub fn first_timestamp(&self) -> u64 { - IggyMessageView::new(&self.messages).header().timestamp() + pub fn first_timestamp(&self) -> Option<u64> { + if self.is_empty() { + return None; + } + Some(IggyMessageView::new(&self.messages).header().timestamp()) } /// Returns the last timestamp in the batch - pub fn last_timestamp(&self) -> u64 { + pub fn last_timestamp(&self) -> Option<u64> { if self.is_empty() { - return 0; + return None; } let last_index = self.count() as usize - 1; - self.get_message_boundaries(last_index) - .map(|(start, _)| { - IggyMessageView::new(&self.messages[start..]) - .header() - .timestamp() - }) - .unwrap_or(0) + self.get_message_boundaries(last_index).map(|(start, _)| { + IggyMessageView::new(&self.messages[start..]) + .header() + .timestamp() + }) + } + + /// Returns the last offset in the batch + pub fn last_offset(&self) -> Option<u64> { + if self.is_empty() { + return None; + } + let last_index = self.count() as usize - 1; + self.get_message_boundaries(last_index).map(|(start, _)| { + IggyMessageView::new(&self.messages[start..]) + .header() + .offset() + }) } /// Checks if the batch is empty. @@ -229,8 +257,21 @@ impl IggyMessagesBatchMut { } /// Decomposes the batch into its constituent parts. - pub fn decompose(self) -> (IggyIndexesMut, BytesMut) { - (self.indexes, self.messages) + pub fn decompose(mut self) -> (IggyIndexesMut, BytesMut) { + let indexes = std::mem::replace(&mut self.indexes, IggyIndexesMut::empty()); + let messages = std::mem::replace(&mut self.messages, BytesMut::new()); + + (indexes, messages) + } + + /// Take the indexes from the batch + pub fn take_indexes(&mut self) -> IggyIndexesMut { + std::mem::take(&mut self.indexes) + } + + /// Borrows the indexes from the batch + pub fn indexes(&self) -> &IggyIndexesMut { + &self.indexes } /// Get message position from the indexes at the given index @@ -266,6 +307,152 @@ impl IggyMessagesBatchMut { } } + /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages + /// whose message headers have an offset greater than or equal to the provided `start_offset`. + pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> Option<Self> { + if self.is_empty() || count == 0 { + return None; + } + + let first_offset = self.first_offset()?; + + if start_offset < first_offset { + return self.slice_by_index(0, count); + } + + let last_offset = self.last_offset()?; + if start_offset > last_offset { + return None; + } + + let offset_diff = start_offset - first_offset; + let first_message_index = offset_diff as usize; + + if first_message_index >= self.count() as usize { + return None; + } + + self.slice_by_index(first_message_index as u32, count) + } + + /// Helper method to slice the batch starting from a specific index + fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> { + if start_index >= self.count() { + return None; + } + + let last_message_index = + std::cmp::min((start_index + count) as usize, self.count() as usize); + + let sub_indexes = self.indexes.slice_by_offset( + start_index, + (last_message_index - start_index as usize) as u32, + )?; + + let first_message_position = self.message_start_position(start_index as usize)?; + let last_message_position = self.message_end_position(last_message_index - 1)?; + + // TODO(hubcio): messages from accumulator unfortunately are deep-copied + let mut sub_buffer = + BYTES_MUT_POOL.get_buffer(last_message_position - first_message_position); + sub_buffer.put_slice(&self.messages[first_message_position..last_message_position]); + + Some(IggyMessagesBatchMut { + count: sub_indexes.count(), + indexes: sub_indexes, + messages: sub_buffer, + }) + } + + /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages + /// whose message headers have a timestamp greater than or equal to the provided `timestamp`. + /// + /// If no messages meet the criteria, returns `None`. + pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> Option<Self> { + if self.is_empty() || count == 0 { + return None; + } + + // Use binary search to find the first message with timestamp >= the target + let first_message_index = self.binary_search_timestamp(timestamp)?; + + self.slice_by_index(first_message_index, count) + } + + /// Find the position of the index with timestamp closest to (but not exceeding) the target + fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> { + if self.count() == 0 { + return None; + } + + let last_timestamp = self.get(self.count() as usize - 1)?.header().timestamp(); + if target_timestamp > last_timestamp { + return Some(self.count() - 1); + } + + let first_timestamp = self.get(0)?.header().timestamp(); + if target_timestamp <= first_timestamp { + return Some(0); + } + + let mut low = 0; + let mut high = self.count() - 1; + + while low <= high { + let mid = low + (high - low) / 2; + let mid_index = self.get(mid as usize)?; + let mid_timestamp = mid_index.header().timestamp(); + + match mid_timestamp.cmp(&target_timestamp) { + std::cmp::Ordering::Equal => return Some(mid), + std::cmp::Ordering::Less => low = mid + 1, + std::cmp::Ordering::Greater => { + if mid == 0 { + break; + } + high = mid - 1; + } + } + } + + Some(low) + } + + /// Validates that all messages have correct checksums and offsets. + /// This function should be called after messages have been read from disk. + /// + /// # Arguments + /// + /// * `absolute_start_offset` - The absolute offset of the first message in the batch. + /// + /// # Returns + /// + /// * `Ok(())` - If all messages have correct checksums and offsets. + /// * `Err(IggyError)` - If any message has an invalid checksum or offset. + pub fn validate_checksums_and_offsets( + &self, + absolute_start_offset: u64, + ) -> Result<(), IggyError> { + let mut current_offset = absolute_start_offset; + for message in self.iter() { + let calculated_checksum = message.calculate_checksum(); + let actual_checksum = message.header().checksum(); + let offset = message.header().offset(); + if offset != current_offset { + return Err(IggyError::InvalidOffset(offset)); + } + if calculated_checksum != actual_checksum { + return Err(IggyError::InvalidMessageChecksum( + actual_checksum, + calculated_checksum, + offset, + )); + } + current_offset += 1; + } + Ok(()) + } + /// Gets the byte range for a message at the given index fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { let start = self.message_start_position(index)?; @@ -325,7 +512,7 @@ impl IggyMessagesBatchMut { /// # Returns /// /// A new `IggyMessagesBatchMut` with the specified messages removed - pub fn remove_messages(self, indexes_to_remove: &[u32], current_position: u32) -> Self { + pub fn remove_messages(&mut self, indexes_to_remove: &[u32], current_position: u32) { /* A temporary list of message boundaries is first collected for each index that should be removed. Chunks of data that are not removed are appended @@ -336,14 +523,13 @@ impl IggyMessagesBatchMut { This allows for avoiding copying unnecessary data and ensures that indexes match the newly constructed buffer. */ - if indexes_to_remove.is_empty() || self.is_empty() { - return self; + return; } let msg_count = self.count() as usize; if indexes_to_remove.len() >= msg_count { - return IggyMessagesBatchMut::default(); + return; } let current_size = self.size(); @@ -369,11 +555,11 @@ impl IggyMessagesBatchMut { let new_size = current_size - size_to_remove; let new_message_count = msg_count as u32 - indexes_to_remove.len() as u32; - let mut new_buffer = BytesMut::with_capacity(new_size as usize); + let mut new_buffer = BYTES_MUT_POOL.get_buffer(new_size as usize); let mut new_indexes = IggyIndexesMut::with_capacity(new_message_count as usize, current_position); - let mut source = self.messages; + let mut source = std::mem::take(&mut self.messages); let mut last_pos = 0_usize; let mut new_pos = current_position; @@ -413,8 +599,23 @@ impl IggyMessagesBatchMut { chunk_len, ); } + } - IggyMessagesBatchMut::from_indexes_and_messages(new_indexes, new_buffer) + /// Validates that all messages in batch have correct checksums. + pub fn validate_checksums(&self) -> Result<(), IggyError> { + for message in self.iter() { + let calculated_checksum = message.calculate_checksum(); + let actual_checksum = message.header().checksum(); + let offset = message.header().offset(); + if calculated_checksum != actual_checksum { + return Err(IggyError::InvalidMessageChecksum( + actual_checksum, + calculated_checksum, + offset, + )); + } + } + Ok(()) } /// Validates the structure of the indexes (sizes, counts, etc.) @@ -590,6 +791,35 @@ impl Validatable<IggyError> for IggyMessagesBatchMut { } } +impl Index<usize> for IggyMessagesBatchMut { + type Output = [u8]; + + fn index(&self, index: usize) -> &Self::Output { + if index >= self.count as usize { + panic!( + "Index out of bounds: the len is {} but the index is {}", + self.count, index + ); + } + + let (start, end) = self + .get_message_boundaries(index) + .expect("Invalid message boundaries"); + + &self.messages[start..end] + } +} + +impl Drop for IggyMessagesBatchMut { + fn drop(&mut self) { + let indexes = std::mem::replace(&mut self.indexes, IggyIndexesMut::empty()); + let messages = std::mem::replace(&mut self.messages, BytesMut::new()); + + messages.return_to_pool(); + drop(indexes); + } +} + impl Deref for IggyMessagesBatchMut { type Target = BytesMut; diff --git a/server/src/streaming/segments/types/messages_batch_set.rs b/server/src/streaming/segments/types/messages_batch_set.rs index 6f32b383..32ecac29 100644 --- a/server/src/streaming/segments/types/messages_batch_set.rs +++ b/server/src/streaming/segments/types/messages_batch_set.rs @@ -18,18 +18,20 @@ use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; use crate::streaming::segments::IggyIndexesMut; +use crate::streaming::utils::bytes_mut_pool::BytesMutExt; use bytes::Bytes; use iggy::models::messaging::IggyMessageView; -use iggy::models::messaging::IggyMessagesBatch; use iggy::prelude::*; use std::ops::Index; use tracing::trace; +use super::IggyMessagesBatchMut; + /// A container for multiple IggyMessagesBatch objects #[derive(Debug, Default)] pub struct IggyMessagesBatchSet { /// The collection of message containers - batches: Vec<IggyMessagesBatch>, + batches: Vec<IggyMessagesBatchMut>, /// Total number of messages across all containers count: u32, /// Total size in bytes across all containers @@ -56,7 +58,7 @@ impl IggyMessagesBatchSet { } /// Create a batch set from an existing vector of IggyMessages - pub fn from_vec(messages: Vec<IggyMessagesBatch>) -> Self { + pub fn from_vec(messages: Vec<IggyMessagesBatchMut>) -> Self { let mut batch = Self::with_capacity(messages.len()); for msg in messages { batch.add_batch(msg); @@ -65,25 +67,25 @@ impl IggyMessagesBatchSet { } /// Add a message container to the batch - pub fn add_batch(&mut self, messages: IggyMessagesBatch) { + pub fn add_batch(&mut self, messages: IggyMessagesBatchMut) { self.count += messages.count(); self.size += messages.size(); self.batches.push(messages); } /// Add another batch of messages to the batch - pub fn add_batch_set(&mut self, other: IggyMessagesBatchSet) { + pub fn add_batch_set(&mut self, mut other: IggyMessagesBatchSet) { self.count += other.count(); self.size += other.size(); - self.batches.extend(other.batches); + let other_batches = std::mem::take(&mut other.batches); + self.batches.extend(other_batches); } /// Extract indexes from all batches in the set - pub fn extract_indexes_to(&mut self, target: &mut IggyIndexesMut) { - for batch in self.iter_mut() { - let indexes = batch.take_indexes(); - let (_, indexes_buffer) = indexes.decompose(); - target.concatenate(indexes_buffer); + pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) { + for batch in self.iter() { + let indexes = batch.indexes(); + target.append_slice(indexes); } } @@ -109,16 +111,25 @@ impl IggyMessagesBatchSet { /// Get timestamp of first message in first batch pub fn first_timestamp(&self) -> Option<u64> { + if self.is_empty() { + return None; + } self.batches.first().map(|batch| batch.first_timestamp())? } /// Get offset of first message in first batch pub fn first_offset(&self) -> Option<u64> { + if self.is_empty() { + return None; + } self.batches.first().map(|batch| batch.first_offset())? } /// Get timestamp of last message in last batch pub fn last_timestamp(&self) -> Option<u64> { + if self.is_empty() { + return None; + } self.batches.last().map(|batch| batch.last_timestamp())? } @@ -128,22 +139,22 @@ impl IggyMessagesBatchSet { } /// Get a reference to the underlying vector of message containers - pub fn inner(&self) -> &Vec<IggyMessagesBatch> { + pub fn inner(&self) -> &Vec<IggyMessagesBatchMut> { &self.batches } /// Consume the batch, returning the underlying vector of message containers - pub fn into_inner(self) -> Vec<IggyMessagesBatch> { - self.batches + pub fn into_inner(mut self) -> Vec<IggyMessagesBatchMut> { + std::mem::take(&mut self.batches) } /// Iterate over all message containers in the batch - pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatch> { + pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatchMut> { self.batches.iter() } /// Iterate over all mutable message containers in the batch - pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut IggyMessagesBatch> { + pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut IggyMessagesBatchMut> { self.batches.iter_mut() } @@ -334,31 +345,44 @@ impl Sizeable for IggyMessagesBatchSet { } } -impl From<Vec<IggyMessagesBatch>> for IggyMessagesBatchSet { - fn from(messages: Vec<IggyMessagesBatch>) -> Self { +impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSet { + fn from(messages: Vec<IggyMessagesBatchMut>) -> Self { Self::from_vec(messages) } } -impl From<IggyMessagesBatch> for IggyMessagesBatchSet { - fn from(messages: IggyMessagesBatch) -> Self { +impl From<IggyMessagesBatchMut> for IggyMessagesBatchSet { + fn from(messages: IggyMessagesBatchMut) -> Self { Self::from_vec(vec![messages]) } } +impl Drop for IggyMessagesBatchSet { + fn drop(&mut self) { + for batch in self.batches.iter_mut() { + let batch = std::mem::take(batch); + let (indexes, messages) = batch.decompose(); + messages.return_to_pool(); + drop(indexes); + } + } +} + /// Iterator that consumes an IggyMessagesBatchSet and yields batches pub struct IggyMessagesBatchSetIntoIter { - batches: Vec<IggyMessagesBatch>, + batches: Vec<IggyMessagesBatchMut>, position: usize, } impl Iterator for IggyMessagesBatchSetIntoIter { - type Item = IggyMessagesBatch; + type Item = IggyMessagesBatchMut; fn next(&mut self) -> Option<Self::Item> { if self.position < self.batches.len() { - let batch = - std::mem::replace(&mut self.batches[self.position], IggyMessagesBatch::empty()); + let batch = std::mem::replace( + &mut self.batches[self.position], + IggyMessagesBatchMut::empty(), + ); self.position += 1; Some(batch) } else { @@ -371,15 +395,3 @@ impl Iterator for IggyMessagesBatchSetIntoIter { (remaining, Some(remaining)) } } - -impl IntoIterator for IggyMessagesBatchSet { - type Item = IggyMessagesBatch; - type IntoIter = IggyMessagesBatchSetIntoIter; - - fn into_iter(self) -> Self::IntoIter { - IggyMessagesBatchSetIntoIter { - batches: self.batches, - position: 0, - } - } -} diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index 7e80574b..3a40e3e1 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -84,17 +84,17 @@ impl Segment { let accumulator = std::mem::take(&mut self.accumulator); - let mut batches = accumulator.into_batch_set(); + let batches = accumulator.into_batch_set(); let confirmation = match confirmation { Some(val) => val, None => self.config.segment.server_confirmation, }; - batches.extract_indexes_to(&mut self.indexes); - let batch_size = batches.size(); let batch_count = batches.count(); + batches.append_indexes_to(&mut self.indexes); + let saved_bytes = self .messages_writer .as_mut() @@ -130,17 +130,15 @@ impl Segment { self.check_and_handle_segment_full().await?; - let saved_messages_count = unsaved_messages_count; - trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", - saved_messages_count, + unsaved_messages_count, self.start_offset, self.partition_id, saved_bytes ); - Ok(saved_messages_count) + Ok(unsaved_messages_count) } fn update_counters(&mut self, messages_size: u64, messages_count: u64) { diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 2b758079..1716dd26 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -21,11 +21,10 @@ use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessa use crate::streaming::session::Session; use crate::streaming::systems::system::System; use crate::streaming::systems::COMPONENT; -use bytes::BytesMut; +use crate::streaming::utils::bytes_mut_pool::BYTES_MUT_POOL; use error_set::ErrContext; use iggy::confirmation::Confirmation; use iggy::consumer::Consumer; -use iggy::models::messaging::IggyMessagesBatch; use iggy::prelude::*; use iggy::utils::crypto::EncryptorKind; use iggy::{error::IggyError, identifier::Identifier}; @@ -164,7 +163,7 @@ impl System { let count = batch.count(); let mut indexes = IggyIndexesMut::with_capacity(batch.count() as usize, 0); - let mut decrypted_messages = BytesMut::with_capacity(batch.size() as usize); + let mut decrypted_messages = BYTES_MUT_POOL.get_buffer(batch.size() as usize); let mut position = 0; for message in batch.iter() { @@ -185,9 +184,8 @@ impl System { } } } - let indexes = indexes.make_immutable(); - let decrypted_messages = decrypted_messages.freeze(); - let decrypted_batch = IggyMessagesBatch::new(indexes, decrypted_messages, count); + let decrypted_batch = + IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, decrypted_messages); decrypted_batches.push(decrypted_batch); } @@ -199,7 +197,8 @@ impl System { batch: IggyMessagesBatchMut, encryptor: &EncryptorKind, ) -> Result<IggyMessagesBatchMut, IggyError> { - let mut encrypted_messages = BytesMut::with_capacity(batch.size() as usize * 2); + let mut encrypted_messages = BYTES_MUT_POOL.get_buffer(batch.size() as usize * 2); + let count = batch.count(); let mut indexes = IggyIndexesMut::with_capacity(batch.count() as usize, 0); let mut position = 0; @@ -230,6 +229,7 @@ impl System { } Ok(IggyMessagesBatchMut::from_indexes_and_messages( + count, indexes, encrypted_messages, )) diff --git a/server/src/streaming/systems/storage.rs b/server/src/streaming/systems/storage.rs index 49649b5f..ff1cc3f2 100644 --- a/server/src/streaming/systems/storage.rs +++ b/server/src/streaming/systems/storage.rs @@ -16,13 +16,16 @@ * under the License. */ -use crate::streaming::persistence::persister::PersisterKind; use crate::streaming::storage::SystemInfoStorage; use crate::streaming::systems::info::SystemInfo; use crate::streaming::systems::COMPONENT; +use crate::streaming::utils::bytes_mut_pool::BytesMutExt; use crate::streaming::utils::file; +use crate::streaming::{ + persistence::persister::PersisterKind, utils::bytes_mut_pool::BYTES_MUT_POOL, +}; use anyhow::Context; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use error_set::ErrContext; use iggy::error::IggyError; use std::sync::Arc; @@ -60,7 +63,7 @@ impl SystemInfoStorage for FileSystemInfoStorage { }) .map_err(|_| IggyError::CannotReadFileMetadata)? .len() as usize; - let mut buffer = BytesMut::with_capacity(file_size); + let mut buffer = BYTES_MUT_POOL.get_buffer(file_size); buffer.put_bytes(0, file_size); file.read_exact(&mut buffer) .await @@ -75,6 +78,7 @@ impl SystemInfoStorage for FileSystemInfoStorage { bincode::serde::decode_from_slice(&buffer, bincode::config::standard()) .with_context(|| "Failed to deserialize system info") .map_err(|_| IggyError::CannotDeserializeResource)?; + buffer.return_to_pool(); Ok(system_info) } diff --git a/server/src/streaming/utils/bytes_mut_pool.rs b/server/src/streaming/utils/bytes_mut_pool.rs new file mode 100644 index 00000000..82a8f302 --- /dev/null +++ b/server/src/streaming/utils/bytes_mut_pool.rs @@ -0,0 +1,313 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bytes::BytesMut; +use crossbeam::queue::ArrayQueue; +use iggy::prelude::IggyByteSize; +use once_cell::sync::Lazy; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use tracing::{debug, info, trace}; + +pub static BYTES_MUT_POOL: Lazy<BytesMutPool> = Lazy::new(BytesMutPool::default); + +/// A pool for reusing BytesMut buffers +#[derive(Clone)] +pub struct BytesMutPool { + // Buffers + small_buffers: Arc<ArrayQueue<BytesMut>>, + medium_buffers: Arc<ArrayQueue<BytesMut>>, + large_buffers: Arc<ArrayQueue<BytesMut>>, + extra_large_buffers: Arc<ArrayQueue<BytesMut>>, + max_buffers: Arc<ArrayQueue<BytesMut>>, + + // Stats + small_created: Arc<AtomicUsize>, + medium_created: Arc<AtomicUsize>, + large_created: Arc<AtomicUsize>, + extra_large_created: Arc<AtomicUsize>, + max_created: Arc<AtomicUsize>, + small_returned: Arc<AtomicUsize>, + medium_returned: Arc<AtomicUsize>, + large_returned: Arc<AtomicUsize>, + extra_large_returned: Arc<AtomicUsize>, + max_returned: Arc<AtomicUsize>, +} + +impl BytesMutPool { + // TODO(hubcio): make BytesMutPool bucket sizes configurable via server.toml + const SMALL_BUFFER_SIZE: usize = 4 * 1024; + const MEDIUM_BUFFER_SIZE: usize = 32 * 1024; + const LARGE_BUFFER_SIZE: usize = 512 * 1024; + const EXTRA_LARGE_BUFFER_SIZE: usize = 2 * 1024 * 1024; + const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024; + + const SMALL_POOL_SIZE: usize = 4096; + const MEDIUM_POOL_SIZE: usize = 1024; + const LARGE_POOL_SIZE: usize = 512; + const EXTRA_LARGE_POOL_SIZE: usize = 128; + const MAX_POOL_SIZE: usize = 64; + + /// Initialize the bytes pool + pub fn init_pool() { + Lazy::force(&BYTES_MUT_POOL); + } + + /// Get a buffer with at least the specified capacity + pub fn get_buffer(&self, capacity: usize) -> BytesMut { + if capacity <= Self::SMALL_BUFFER_SIZE { + if let Some(mut buffer) = self.small_buffers.pop() { + buffer.clear(); + trace!("Reused small buffer with capacity: {}", buffer.capacity()); + return buffer; + } + self.small_created.fetch_add(1, Ordering::Relaxed); + let buffer = BytesMut::with_capacity(Self::SMALL_BUFFER_SIZE); + trace!( + "Created new small buffer with capacity: {}", + buffer.capacity() + ); + buffer + } else if capacity <= Self::MEDIUM_BUFFER_SIZE { + if let Some(mut buffer) = self.medium_buffers.pop() { + buffer.clear(); + trace!("Reused medium buffer with capacity: {}", buffer.capacity()); + return buffer; + } + self.medium_created.fetch_add(1, Ordering::Relaxed); + let buffer = BytesMut::with_capacity(Self::MEDIUM_BUFFER_SIZE); + trace!( + "Created new medium buffer with capacity: {}", + buffer.capacity() + ); + buffer + } else if capacity <= Self::LARGE_BUFFER_SIZE { + if let Some(mut buffer) = self.large_buffers.pop() { + buffer.clear(); + trace!("Reused large buffer with capacity: {}", buffer.capacity()); + return buffer; + } + self.large_created.fetch_add(1, Ordering::Relaxed); + let buffer = BytesMut::with_capacity(Self::LARGE_BUFFER_SIZE); + trace!( + "Created new large buffer with capacity: {}", + buffer.capacity() + ); + buffer + } else if capacity <= Self::EXTRA_LARGE_BUFFER_SIZE { + if let Some(mut buffer) = self.extra_large_buffers.pop() { + buffer.clear(); + trace!( + "Reused extra large buffer with capacity: {}", + buffer.capacity() + ); + return buffer; + } + self.extra_large_created.fetch_add(1, Ordering::Relaxed); + let buffer = BytesMut::with_capacity(Self::EXTRA_LARGE_BUFFER_SIZE); + trace!( + "Created new extra large buffer with capacity: {}", + buffer.capacity() + ); + buffer + } else if capacity <= Self::MAX_BUFFER_SIZE { + if let Some(mut buffer) = self.max_buffers.pop() { + buffer.clear(); + trace!("Reused max buffer with capacity: {}", buffer.capacity()); + return buffer; + } + self.max_created.fetch_add(1, Ordering::Relaxed); + let buffer = BytesMut::with_capacity(Self::MAX_BUFFER_SIZE); + trace!( + "Created new max buffer with capacity: {}", + buffer.capacity() + ); + buffer + } else { + // For very large buffers that exceed our max size, just allocate directly + debug!("Created oversized buffer with capacity: {} B", capacity); + BytesMut::with_capacity(capacity) + } + } + + /// Return a buffer to the pool + fn return_buffer(&self, buffer: BytesMut) { + let capacity = buffer.capacity(); + if capacity == Self::SMALL_BUFFER_SIZE { + if self.small_buffers.push(buffer).is_err() { + trace!("Small buffer pool full, dropping buffer"); + } else { + self.small_returned.fetch_add(1, Ordering::Relaxed); + trace!("Returned small buffer to pool"); + } + } else if capacity == Self::MEDIUM_BUFFER_SIZE { + if self.medium_buffers.push(buffer).is_err() { + trace!("Medium buffer pool full, dropping buffer"); + } else { + self.medium_returned.fetch_add(1, Ordering::Relaxed); + trace!("Returned medium buffer to pool"); + } + } else if capacity == Self::LARGE_BUFFER_SIZE { + if self.large_buffers.push(buffer).is_err() { + trace!("Large buffer pool full, dropping buffer"); + } else { + self.large_returned.fetch_add(1, Ordering::Relaxed); + trace!("Returned large buffer to pool"); + } + } else if capacity == Self::EXTRA_LARGE_BUFFER_SIZE { + if self.extra_large_buffers.push(buffer).is_err() { + trace!("Extra large buffer pool full, dropping buffer"); + } else { + self.extra_large_returned.fetch_add(1, Ordering::Relaxed); + trace!("Returned extra large buffer to pool"); + } + } else if capacity == Self::MAX_BUFFER_SIZE { + if self.max_buffers.push(buffer).is_err() { + trace!("Max buffer pool full, dropping buffer"); + } else { + self.max_returned.fetch_add(1, Ordering::Relaxed); + trace!("Returned max buffer to pool"); + } + } else if capacity != 0 { + trace!( + "Returned buffer to pool with unknown capacity: {}", + capacity + ); + } + } + + /// Log stats about buffer allocation and reuse + pub fn log_stats(&self) { + let sm_created = self.small_created.load(Ordering::Relaxed); + let md_created = self.medium_created.load(Ordering::Relaxed); + let lg_created = self.large_created.load(Ordering::Relaxed); + let xl_created = self.extra_large_created.load(Ordering::Relaxed); + let mx_created = self.max_created.load(Ordering::Relaxed); + + let sm_returned = self.small_returned.load(Ordering::Relaxed); + let md_returned = self.medium_returned.load(Ordering::Relaxed); + let lg_returned = self.large_returned.load(Ordering::Relaxed); + let xl_returned = self.extra_large_returned.load(Ordering::Relaxed); + let mx_returned = self.max_returned.load(Ordering::Relaxed); + + let sm_pool = self.small_buffers.len(); + let md_pool = self.medium_buffers.len(); + let lg_pool = self.large_buffers.len(); + let xl_pool = self.extra_large_buffers.len(); + let mx_pool = self.max_buffers.len(); + + let sm_size = IggyByteSize::from((sm_pool * Self::SMALL_BUFFER_SIZE) as u64); + let md_size = IggyByteSize::from((md_pool * Self::MEDIUM_BUFFER_SIZE) as u64); + let lg_size = IggyByteSize::from((lg_pool * Self::LARGE_BUFFER_SIZE) as u64); + let xl_size = IggyByteSize::from((xl_pool * Self::EXTRA_LARGE_BUFFER_SIZE) as u64); + let mx_size = IggyByteSize::from((mx_pool * Self::MAX_BUFFER_SIZE) as u64); + + let total_size = IggyByteSize::from( + (sm_pool * Self::SMALL_BUFFER_SIZE + + md_pool * Self::MEDIUM_BUFFER_SIZE + + lg_pool * Self::LARGE_BUFFER_SIZE + + xl_pool * Self::EXTRA_LARGE_BUFFER_SIZE + + mx_pool * Self::MAX_BUFFER_SIZE) as u64, + ); + + let sm_reuse = if sm_created > 0 { + sm_returned as f64 / sm_created as f64 * 100.0 + } else { + 0.0 + }; + let md_reuse = if md_created > 0 { + md_returned as f64 / md_created as f64 * 100.0 + } else { + 0.0 + }; + let lg_reuse = if lg_created > 0 { + lg_returned as f64 / lg_created as f64 * 100.0 + } else { + 0.0 + }; + let xl_reuse = if xl_created > 0 { + xl_returned as f64 / xl_created as f64 * 100.0 + } else { + 0.0 + }; + let mx_reuse = if mx_created > 0 { + mx_returned as f64 / mx_created as f64 * 100.0 + } else { + 0.0 + }; + + let sm_util = sm_pool as f64 / Self::SMALL_POOL_SIZE as f64 * 100.0; + let md_util = md_pool as f64 / Self::MEDIUM_POOL_SIZE as f64 * 100.0; + let lg_util = lg_pool as f64 / Self::LARGE_POOL_SIZE as f64 * 100.0; + let xl_util = xl_pool as f64 / Self::EXTRA_LARGE_POOL_SIZE as f64 * 100.0; + let mx_util = mx_pool as f64 / Self::MAX_POOL_SIZE as f64 * 100.0; + + info!("BytesPool: Small[{}/{}|{:.1}%|{:.1}%|{}] Medium[{}/{}|{:.1}%|{:.1}%|{}] Large[{}/{}|{:.1}%|{:.1}%|{}]", + sm_returned, sm_created, sm_reuse, sm_util, sm_size, + md_returned, md_created, md_reuse, md_util, md_size, + lg_returned, lg_created, lg_reuse, lg_util, lg_size); + + info!( + "BytesPool: XLarge[{}/{}|{:.1}%|{:.1}%|{}] Max[{}/{}|{:.1}%|{:.1}%|{}] Total: {}", + xl_returned, + xl_created, + xl_reuse, + xl_util, + xl_size, + mx_returned, + mx_created, + mx_reuse, + mx_util, + mx_size, + total_size + ); + } +} + +impl Default for BytesMutPool { + fn default() -> Self { + Self { + small_buffers: Arc::new(ArrayQueue::new(Self::SMALL_POOL_SIZE)), + medium_buffers: Arc::new(ArrayQueue::new(Self::MEDIUM_POOL_SIZE)), + large_buffers: Arc::new(ArrayQueue::new(Self::LARGE_POOL_SIZE)), + extra_large_buffers: Arc::new(ArrayQueue::new(Self::EXTRA_LARGE_POOL_SIZE)), + max_buffers: Arc::new(ArrayQueue::new(Self::MAX_POOL_SIZE)), + small_created: Arc::new(AtomicUsize::new(0)), + medium_created: Arc::new(AtomicUsize::new(0)), + large_created: Arc::new(AtomicUsize::new(0)), + extra_large_created: Arc::new(AtomicUsize::new(0)), + max_created: Arc::new(AtomicUsize::new(0)), + small_returned: Arc::new(AtomicUsize::new(0)), + medium_returned: Arc::new(AtomicUsize::new(0)), + large_returned: Arc::new(AtomicUsize::new(0)), + extra_large_returned: Arc::new(AtomicUsize::new(0)), + max_returned: Arc::new(AtomicUsize::new(0)), + } + } +} + +/// Extension trait for more ergonomic buffer return +pub trait BytesMutExt { + fn return_to_pool(self); +} + +impl BytesMutExt for BytesMut { + fn return_to_pool(self) { + BYTES_MUT_POOL.return_buffer(self); + } +} diff --git a/server/src/streaming/utils/mod.rs b/server/src/streaming/utils/mod.rs index 33481414..25fc7d43 100644 --- a/server/src/streaming/utils/mod.rs +++ b/server/src/streaming/utils/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +pub mod bytes_mut_pool; pub mod crypto; pub mod file; pub mod hash;