This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-reads-in-flight in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 3fb911374e957fbbed5c7976383f9de96a2e04ac Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 16 12:48:57 2026 +0100 perf(server): zero-copy reads from in-flight buffer during async I/O --- core/common/src/types/message/messages_batch.rs | 59 +++ core/integration/tests/streaming/get_by_offset.rs | 3 +- .../tests/streaming/get_by_timestamp.rs | 3 +- .../handlers/messages/poll_messages_handler.rs | 32 +- core/server/src/http/http_shard_wrapper.rs | 4 +- core/server/src/shard/system/messages.rs | 21 +- core/server/src/shard/transmission/frame.rs | 6 +- core/server/src/slab/streams.rs | 138 +++---- core/server/src/streaming/partitions/helpers.rs | 14 +- core/server/src/streaming/partitions/journal.rs | 6 +- .../streaming/segments/messages/messages_writer.rs | 4 +- core/server/src/streaming/segments/messages/mod.rs | 4 +- core/server/src/streaming/segments/mod.rs | 2 + .../streaming/segments/types/messages_batch_set.rs | 407 ++++++++++++++++----- core/server/src/streaming/segments/types/mod.rs | 2 + 15 files changed, 513 insertions(+), 192 deletions(-) diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index 6bc704866..703bb8f9d 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -120,6 +120,65 @@ impl IggyMessagesBatch { self.iter().last().map(|msg| msg.header().timestamp()) } + /// Slice the batch by message offset. Returns messages starting at `start_offset` up to `count`. + /// + /// Uses `Bytes::slice()` internally for zero-copy slicing. + /// Returns `None` if the offset range doesn't overlap with this batch. + pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> Option<Self> { + if self.is_empty() || count == 0 { + return None; + } + + let first_offset = self.first_offset()?; + + // If requested offset is before this batch, start from beginning + if start_offset < first_offset { + return self.slice_by_index(0, count); + } + + let last_offset = self.last_offset()?; + if start_offset > last_offset { + return None; + } + + // Offsets are contiguous within a batch + let start_index = (start_offset - first_offset) as u32; + self.slice_by_index(start_index, count) + } + + /// Slice the batch to return messages starting at `start_index` up to `count`. + /// + /// Uses `Bytes::slice()` internally for zero-copy slicing. + /// Returns `None` if `start_index` is out of bounds or no messages match. + pub fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> { + if start_index >= self.count || count == 0 { + return None; + } + + let available = self.count - start_index; + let actual_count = count.min(available); + + // Slice indexes (zero-copy via Bytes::slice) + let sliced_indexes = self.indexes.slice_by_offset(start_index, actual_count)?; + + // Calculate message buffer slice boundaries + let msg_start = self.message_start_position(start_index as usize); + let msg_end = self.message_end_position((start_index + actual_count - 1) as usize); + + if msg_start > msg_end || msg_end > self.messages.len() { + return None; + } + + // Slice messages (zero-copy via Bytes::slice) + let sliced_messages = self.messages.slice(msg_start..msg_end); + + Some(Self { + count: actual_count, + indexes: sliced_indexes, + messages: sliced_messages, + }) + } + /// Calculates the start position of a message at the given index in the buffer fn message_start_position(&self, index: usize) -> usize { if index == 0 { diff --git a/core/integration/tests/streaming/get_by_offset.rs b/core/integration/tests/streaming/get_by_offset.rs index ca5438529..d96460744 100644 --- a/core/integration/tests/streaming/get_by_offset.rs +++ b/core/integration/tests/streaming/get_by_offset.rs @@ -307,8 +307,9 @@ async fn test_get_messages_by_offset( // Test 6: Validate message content and ordering for all messages let mut i = 0; + let mutable_batches = batches.into_mutable(); - for batch in batches.iter() { + for batch in mutable_batches.iter() { for msg in batch.iter() { let expected_offset = span_offset + i as u64; assert!( diff --git a/core/integration/tests/streaming/get_by_timestamp.rs b/core/integration/tests/streaming/get_by_timestamp.rs index 90afc5967..8b7e4bb10 100644 --- a/core/integration/tests/streaming/get_by_timestamp.rs +++ b/core/integration/tests/streaming/get_by_timestamp.rs @@ -323,7 +323,8 @@ async fn test_get_messages_by_timestamp( let span_timestamp_micros = span_timestamp.as_micros(); // Test 6: Validate message content and ordering - for batch in spanning_messages.iter() { + let mutable_spanning = spanning_messages.into_mutable(); + for batch in mutable_spanning.iter() { for msg in batch.iter() { let msg_timestamp = msg.header().timestamp(); assert!( diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs b/core/server/src/binary/handlers/messages/poll_messages_handler.rs index 354b554b0..223d7b1b0 100644 --- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -22,6 +22,7 @@ use crate::binary::command::{ use crate::binary::handlers::utils::receive_and_validate; use crate::shard::IggyShard; use crate::shard::system::messages::PollingArgs; +use crate::streaming::segments::PolledBatches; use crate::streaming::session::Session; use anyhow::Result; use iggy_common::SenderKind; @@ -71,7 +72,7 @@ impl ServerCommandHandler for PollMessages { let user_id = session.get_user_id(); let client_id = session.client_id; - let (metadata, mut batch) = shard + let (metadata, batch) = shard .poll_messages( client_id, user_id, @@ -91,26 +92,39 @@ impl ServerCommandHandler for PollMessages { // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for messages_count + size of all batches. let response_length = 4 + 8 + 4 + batch.size(); let response_length_bytes = response_length.to_le_bytes(); + let msg_count = batch.count(); let mut bufs = Vec::with_capacity(batch.containers_count() + 3); let mut partition_id_buf = PooledBuffer::with_capacity(4); let mut current_offset_buf = PooledBuffer::with_capacity(8); - let mut count_buf = PooledBuffer::with_capacity(4); + let mut msg_count_buf = PooledBuffer::with_capacity(4); partition_id_buf.put_u32_le(metadata.partition_id); current_offset_buf.put_u64_le(metadata.current_offset); - count_buf.put_u32_le(batch.count()); + msg_count_buf.put_u32_le(msg_count); bufs.push(partition_id_buf); bufs.push(current_offset_buf); - bufs.push(count_buf); + bufs.push(msg_count_buf); + + match batch { + PolledBatches::Mutable(mut mutable_set) => { + for msg_batch in mutable_set.iter_mut() { + bufs.push(msg_batch.take_messages()); + } + } + PolledBatches::Frozen(frozen_set) => { + // For frozen batches, wrap Bytes in PooledBuffer for socket send. + // This copies at the handler level but avoids copies in the storage layer + // during concurrent reads while async disk I/O is in progress. + for msg_batch in frozen_set.iter() { + bufs.push(PooledBuffer::from(msg_batch.buffer())); + } + } + } - batch.iter_mut().for_each(|m| { - bufs.push(m.take_messages()); - }); trace!( "Sending {} messages to client ({} bytes) to client", - batch.count(), - response_length + msg_count, response_length ); sender diff --git a/core/server/src/http/http_shard_wrapper.rs b/core/server/src/http/http_shard_wrapper.rs index 7bcfdb65b..da5595c8e 100644 --- a/core/server/src/http/http_shard_wrapper.rs +++ b/core/server/src/http/http_shard_wrapper.rs @@ -26,7 +26,7 @@ use send_wrapper::SendWrapper; use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; use crate::shard::system::messages::PollingArgs; use crate::state::command::EntryCommand; -use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use crate::streaming::segments::{IggyMessagesBatchMut, PolledBatches}; use crate::streaming::topics; use crate::streaming::users::user::User; use crate::{shard::IggyShard, streaming::session::Session}; @@ -278,7 +278,7 @@ impl HttpSafeShard { consumer: Consumer, maybe_partition_id: Option<u32>, args: PollingArgs, - ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { + ) -> Result<(IggyPollMetadata, PolledBatches), IggyError> { let future = SendWrapper::new(self.shard().poll_messages( client_id, user_id, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 02e8d9d0e..a53077362 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -24,7 +24,9 @@ use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult, }; -use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet}; +use crate::streaming::segments::{ + IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSetMut, PolledBatches, +}; use crate::streaming::traits::MainOps; use crate::streaming::{partitions, streams, topics}; use err_trail::ErrContext; @@ -129,7 +131,7 @@ impl IggyShard { consumer: Consumer, maybe_partition_id: Option<u32>, args: PollingArgs, - ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { + ) -> Result<(IggyPollMetadata, PolledBatches), IggyError> { self.ensure_topic_exists(&stream_id, &topic_id)?; let numeric_stream_id = self @@ -159,7 +161,7 @@ impl IggyShard { true, )? else { - return Ok((IggyPollMetadata::new(0, 0), IggyMessagesBatchSet::empty())); + return Ok((IggyPollMetadata::new(0, 0), PolledBatches::empty())); }; self.ensure_partition_exists(&stream_id, &topic_id, partition_id)?; @@ -175,7 +177,7 @@ impl IggyShard { { return Ok(( IggyPollMetadata::new(partition_id as u32, current_offset), - IggyMessagesBatchSet::empty(), + PolledBatches::empty(), )); } @@ -234,7 +236,10 @@ impl IggyShard { }?; let batch = if let Some(encryptor) = &self.encryptor { - self.decrypt_messages(batch, encryptor).await? + // Decryption requires mutable access, so convert to mutable + let mutable = batch.into_mutable(); + let decrypted = self.decrypt_messages(mutable, encryptor).await?; + PolledBatches::Mutable(decrypted) } else { batch }; @@ -344,9 +349,9 @@ impl IggyShard { async fn decrypt_messages( &self, - batches: IggyMessagesBatchSet, + batches: IggyMessagesBatchSetMut, encryptor: &EncryptorKind, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessagesBatchSetMut, IggyError> { let mut decrypted_batches = Vec::with_capacity(batches.containers_count()); for batch in batches.iter() { let count = batch.count(); @@ -384,7 +389,7 @@ impl IggyShard { decrypted_batches.push(decrypted_batch); } - Ok(IggyMessagesBatchSet::from_vec(decrypted_batches)) + Ok(IggyMessagesBatchSetMut::from_vec(decrypted_batches)) } pub fn maybe_encrypt_messages( diff --git a/core/server/src/shard/transmission/frame.rs b/core/server/src/shard/transmission/frame.rs index 7c0fd2578..1ae2d4a51 100644 --- a/core/server/src/shard/transmission/frame.rs +++ b/core/server/src/shard/transmission/frame.rs @@ -21,15 +21,13 @@ use iggy_common::{IggyError, Stats}; use crate::{ binary::handlers::messages::poll_messages_handler::IggyPollMetadata, shard::transmission::message::ShardMessage, - streaming::{ - segments::IggyMessagesBatchSet, streams::stream, topics::topic, users::user::User, - }, + streaming::{segments::PolledBatches, streams::stream, topics::topic, users::user::User}, }; #[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum ShardResponse { - PollMessages((IggyPollMetadata, IggyMessagesBatchSet)), + PollMessages((IggyPollMetadata, PolledBatches)), SendMessages, FlushUnsavedBuffer, DeleteSegments, diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index f1c342de2..66d0f7ca0 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -41,7 +41,8 @@ use crate::{ }, polling_consumer::PollingConsumer, segments::{ - IggyMessagesBatchMut, IggyMessagesBatchSet, Segment, storage::create_segment_storage, + IggyMessagesBatchMut, IggyMessagesBatchSet, IggyMessagesBatchSetMut, PolledBatches, + Segment, storage::create_segment_storage, }, streams::{ self, @@ -202,7 +203,7 @@ impl MainOps for Streams { type PollingArgs = PollingArgs; type Consumer = PollingConsumer; type In = IggyMessagesBatchMut; - type Out = (IggyPollMetadata, IggyMessagesBatchSet); + type Out = (IggyPollMetadata, PolledBatches); type Error = IggyError; async fn append_messages( @@ -317,13 +318,11 @@ impl MainOps for Streams { // We have to remember to keep the invariant from the if that is on line 290. // Alternatively a better design would be to get rid of that if and move the validations here. if offset > current_offset { - return Ok((metadata, IggyMessagesBatchSet::default())); + return Ok((metadata, PolledBatches::default())); } - let batches = self - .get_messages_by_offset(stream_id, topic_id, partition_id, offset, count) - .await?; - Ok(batches) + self.get_messages_by_offset(stream_id, topic_id, partition_id, offset, count) + .await? } PollingKind::Timestamp => { let timestamp = IggyTimestamp::from(value); @@ -343,7 +342,7 @@ impl MainOps for Streams { count, ) .await?; - Ok(batches) + PolledBatches::Mutable(batches) } PollingKind::First => { let first_offset = self.with_partition_by_id( @@ -358,10 +357,8 @@ impl MainOps for Streams { }, ); - let batches = self - .get_messages_by_offset(stream_id, topic_id, partition_id, first_offset, count) - .await?; - Ok(batches) + self.get_messages_by_offset(stream_id, topic_id, partition_id, first_offset, count) + .await? } PollingKind::Last => { let (start_offset, actual_count) = self.with_partition_by_id( @@ -379,16 +376,14 @@ impl MainOps for Streams { }, ); - let batches = self - .get_messages_by_offset( - stream_id, - topic_id, - partition_id, - start_offset, - actual_count, - ) - .await?; - Ok(batches) + self.get_messages_by_offset( + stream_id, + topic_id, + partition_id, + start_offset, + actual_count, + ) + .await? } PollingKind::Next => { let consumer_offset = match consumer { @@ -414,10 +409,8 @@ impl MainOps for Streams { match consumer_offset { None => { - let batches = self - .get_messages_by_offset(stream_id, topic_id, partition_id, 0, count) - .await?; - Ok(batches) + self.get_messages_by_offset(stream_id, topic_id, partition_id, 0, count) + .await? } Some(consumer_offset) => { let offset = consumer_offset + 1; @@ -440,20 +433,18 @@ impl MainOps for Streams { ); } } - let batches = self - .get_messages_by_offset( - stream_id, - topic_id, - partition_id, - offset, - count, - ) - .await?; - Ok(batches) + self.get_messages_by_offset( + stream_id, + topic_id, + partition_id, + offset, + count, + ) + .await? } } } - }?; + }; Ok((metadata, batches)) } } @@ -649,9 +640,9 @@ impl Streams { partition_id: partitions::ContainerId, offset: u64, count: u32, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<PolledBatches, IggyError> { if count == 0 { - return Ok(IggyMessagesBatchSet::default()); + return Ok(PolledBatches::default()); } use crate::streaming::partitions::helpers; @@ -663,7 +654,7 @@ impl Streams { ); let mut remaining_count = count; - let mut batches = IggyMessagesBatchSet::empty(); + let mut batches: Option<IggyMessagesBatchSetMut> = None; let mut current_offset = offset; for idx in range { @@ -719,10 +710,22 @@ impl Streams { current_offset += messages_count as u64; } - batches.add_batch_set(messages); + // If this is the only/first result and we're done, return directly (preserves Frozen) + if batches.is_none() && remaining_count == 0 { + return Ok(messages); + } + + // Need to accumulate, convert to mutable if necessary + let mutable_set = messages.into_mutable(); + match &mut batches { + Some(existing) => existing.add_batch_set(mutable_set), + None => batches = Some(mutable_set), + } } - Ok(batches) + Ok(PolledBatches::Mutable( + batches.unwrap_or_else(IggyMessagesBatchSetMut::empty), + )) } #[allow(clippy::too_many_arguments)] @@ -736,7 +739,7 @@ impl Streams { end_offset: u64, count: u32, segment_start_offset: u64, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<PolledBatches, IggyError> { let ( is_journal_empty, journal_first_offset, @@ -765,7 +768,6 @@ impl Streams { // Case 0: Journal is empty, check in-flight buffer or disk if is_journal_empty { if !in_flight_empty && offset >= in_flight_first && offset <= in_flight_last { - let mut result = IggyMessagesBatchSet::empty(); let in_flight_batches = self.with_partition_by_id( stream_id, topic_id, @@ -773,13 +775,13 @@ impl Streams { |(_, _, _, _, _, _, log)| log.in_flight().get_by_offset(offset, count).to_vec(), ); if !in_flight_batches.is_empty() { - result.add_immutable_batches(&in_flight_batches); - let final_result = result.get_by_offset(offset, count); - return Ok(final_result); + let batches = + PolledBatches::Frozen(IggyMessagesBatchSet::from_vec(in_flight_batches)); + return Ok(batches.filter_by_offset(offset, count)); } } - return self + let disk_batches = self .load_messages_from_disk_by_offset( stream_id, topic_id, @@ -789,7 +791,8 @@ impl Streams { count, segment_start_offset, ) - .await; + .await?; + return Ok(PolledBatches::Mutable(disk_batches)); } // Case 1: All messages are in accumulator buffer @@ -803,12 +806,12 @@ impl Streams { .get(|batches| batches.get_by_offset(offset, count)) }, ); - return Ok(batches); + return Ok(PolledBatches::Mutable(batches)); } // Case 2: All messages are on disk if end_offset < journal_first_offset { - return self + let disk_batches = self .load_messages_from_disk_by_offset( stream_id, topic_id, @@ -818,7 +821,8 @@ impl Streams { count, segment_start_offset, ) - .await; + .await?; + return Ok(PolledBatches::Mutable(disk_batches)); } // Case 3: Messages span disk and accumulator buffer boundary @@ -828,7 +832,7 @@ impl Streams { } else { 0 }; - let mut combined_batch_set = IggyMessagesBatchSet::empty(); + let mut combined_batch_set = IggyMessagesBatchSetMut::empty(); // Load messages from disk if needed if disk_count > 0 { @@ -873,7 +877,7 @@ impl Streams { } } - Ok(combined_batch_set) + Ok(PolledBatches::Mutable(combined_batch_set)) } #[allow(clippy::too_many_arguments)] @@ -886,7 +890,7 @@ impl Streams { start_offset: u64, count: u32, segment_start_offset: u64, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessagesBatchSetMut, IggyError> { let relative_start_offset = (start_offset - segment_start_offset) as u32; let (index_reader, messages_reader, indexes) = self.with_partition_by_id( @@ -930,7 +934,7 @@ impl Streams { }; if indexes_to_read.is_none() { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessagesBatchSetMut::empty()); } let indexes_to_read = indexes_to_read.unwrap(); @@ -946,7 +950,7 @@ impl Streams { format!("Failed to validate messages read from disk! error: {e}") })?; - Ok(IggyMessagesBatchSet::from(batch)) + Ok(IggyMessagesBatchSetMut::from(batch)) } pub async fn get_messages_by_timestamp( @@ -956,7 +960,7 @@ impl Streams { partition_id: partitions::ContainerId, timestamp: u64, count: u32, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessagesBatchSetMut, IggyError> { use crate::streaming::partitions::helpers; let Ok(range) = self.with_partition_by_id( stream_id, @@ -964,11 +968,11 @@ impl Streams { partition_id, helpers::get_segment_range_by_timestamp(timestamp), ) else { - return Ok(IggyMessagesBatchSet::default()); + return Ok(IggyMessagesBatchSetMut::default()); }; let mut remaining_count = count; - let mut batches = IggyMessagesBatchSet::empty(); + let mut batches = IggyMessagesBatchSetMut::empty(); for idx in range { if remaining_count == 0 { @@ -1020,9 +1024,9 @@ impl Streams { idx: usize, timestamp: u64, count: u32, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessagesBatchSetMut, IggyError> { if count == 0 { - return Ok(IggyMessagesBatchSet::default()); + return Ok(IggyMessagesBatchSetMut::default()); } let (is_journal_empty, journal_first_timestamp, journal_last_timestamp) = self @@ -1056,7 +1060,7 @@ impl Streams { // Case 1: All messages are in accumulator buffer (timestamp is after journal ends) if timestamp > journal_last_timestamp { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessagesBatchSetMut::empty()); } // Case 1b: Timestamp is within journal range @@ -1116,7 +1120,7 @@ impl Streams { idx: usize, timestamp: u64, count: u32, - ) -> Result<IggyMessagesBatchSet, IggyError> { + ) -> Result<IggyMessagesBatchSetMut, IggyError> { let (index_reader, messages_reader, indexes) = self.with_partition_by_id( stream_id, topic_id, @@ -1158,7 +1162,7 @@ impl Streams { }; if indexes_to_read.is_none() { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessagesBatchSetMut::empty()); } let indexes_to_read = indexes_to_read.unwrap(); @@ -1171,7 +1175,7 @@ impl Streams { format!("Failed to load messages from disk by timestamp: {e}") })?; - Ok(IggyMessagesBatchSet::from(batch)) + Ok(IggyMessagesBatchSetMut::from(batch)) } pub async fn handle_full_segment( @@ -1322,7 +1326,7 @@ impl Streams { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - mut batches: IggyMessagesBatchSet, + mut batches: IggyMessagesBatchSetMut, config: &SystemConfig, ) -> Result<u32, IggyError> { let batch_count = batches.count(); diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 531e77e54..c00465b39 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -32,7 +32,9 @@ use crate::{ storage, }, polling_consumer::ConsumerGroupId, - segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, storage::Storage}, + segments::{ + IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSetMut, storage::Storage, + }, }, }; use err_trail::ErrContext; @@ -334,7 +336,7 @@ pub async fn load_messages_from_disk_by_timestamp( index: &Option<IggyIndexesMut>, timestamp: u64, count: u32, -) -> Result<IggyMessagesBatchSet, IggyError> { +) -> Result<IggyMessagesBatchSetMut, IggyError> { let indexes_to_read = if let Some(indexes) = index { if !indexes.is_empty() { indexes.slice_by_timestamp(timestamp, count) @@ -356,7 +358,7 @@ pub async fn load_messages_from_disk_by_timestamp( }; if indexes_to_read.is_none() { - return Ok(IggyMessagesBatchSet::empty()); + return Ok(IggyMessagesBatchSetMut::empty()); } let indexes_to_read = indexes_to_read.unwrap(); @@ -369,7 +371,7 @@ pub async fn load_messages_from_disk_by_timestamp( .await .error(|e: &IggyError| format!("Failed to load messages from disk by timestamp: {e}"))?; - Ok(IggyMessagesBatchSet::from(batch)) + Ok(IggyMessagesBatchSetMut::from(batch)) } pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) -> u64 { @@ -430,7 +432,7 @@ pub fn append_to_journal( } } -pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSet { +pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSetMut { |(.., log)| { let batches = log.journal_mut().commit(); log.ensure_indexes(); @@ -475,7 +477,7 @@ pub fn persist_batch( stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - batches: IggyMessagesBatchSet, + batches: IggyMessagesBatchSetMut, reason: String, ) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize, u32), IggyError> { async move |(.., log)| { diff --git a/core/server/src/streaming/partitions/journal.rs b/core/server/src/streaming/partitions/journal.rs index d16db4f6d..a405ae5a7 100644 --- a/core/server/src/streaming/partitions/journal.rs +++ b/core/server/src/streaming/partitions/journal.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet}; +use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSetMut}; use iggy_common::{IggyByteSize, IggyError}; use std::fmt::Debug; @@ -34,7 +34,7 @@ pub struct Inner { #[derive(Default, Debug)] pub struct MemoryMessageJournal { - batches: IggyMessagesBatchSet, + batches: IggyMessagesBatchSetMut, inner: Inner, } @@ -48,7 +48,7 @@ impl Clone for MemoryMessageJournal { } impl Journal for MemoryMessageJournal { - type Container = IggyMessagesBatchSet; + type Container = IggyMessagesBatchSetMut; type Entry = IggyMessagesBatchMut; type Inner = Inner; type AppendResult = Result<(u32, u32), IggyError>; diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs b/core/server/src/streaming/segments/messages/messages_writer.rs index 85d3bd215..0ef5e4023 100644 --- a/core/server/src/streaming/segments/messages/messages_writer.rs +++ b/core/server/src/streaming/segments/messages/messages_writer.rs @@ -17,7 +17,7 @@ */ use crate::streaming::segments::{ - IggyMessagesBatchSet, + IggyMessagesBatchSetMut, messages::{write_batch, write_batch_frozen}, }; use compio::fs::{File, OpenOptions}; @@ -99,7 +99,7 @@ impl MessagesWriter { /// Append a batch of messages to the messages file. pub async fn save_batch_set( &self, - batch_set: IggyMessagesBatchSet, + batch_set: IggyMessagesBatchSetMut, ) -> Result<IggyByteSize, IggyError> { let messages_size = batch_set.size(); let messages_count = batch_set.count(); diff --git a/core/server/src/streaming/segments/messages/mod.rs b/core/server/src/streaming/segments/messages/mod.rs index c085ed882..fdb5e1dae 100644 --- a/core/server/src/streaming/segments/messages/mod.rs +++ b/core/server/src/streaming/segments/messages/mod.rs @@ -19,7 +19,7 @@ mod messages_reader; mod messages_writer; -use super::IggyMessagesBatchSet; +use super::IggyMessagesBatchSetMut; use bytes::Bytes; use compio::{fs::File, io::AsyncWriteAtExt}; use iggy_common::{IggyError, IggyMessagesBatch}; @@ -31,7 +31,7 @@ pub use messages_writer::MessagesWriter; async fn write_batch( file: &File, position: u64, - mut batches: IggyMessagesBatchSet, + mut batches: IggyMessagesBatchSetMut, ) -> Result<usize, IggyError> { let total_written = batches.iter().map(|b| b.size() as usize).sum(); let batches = batches diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index 7ed09c2d4..0ffdf623f 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -29,5 +29,7 @@ pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut; pub use types::IggyMessagesBatchMut; pub use types::IggyMessagesBatchSet; +pub use types::IggyMessagesBatchSetMut; +pub use types::PolledBatches; pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024; diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs b/core/server/src/streaming/segments/types/messages_batch_set.rs index 0856bd217..9cecf1d8f 100644 --- a/core/server/src/streaming/segments/types/messages_batch_set.rs +++ b/core/server/src/streaming/segments/types/messages_batch_set.rs @@ -28,19 +28,22 @@ use tracing::trace; use super::IggyMessagesBatchMut; -/// A container for multiple IggyMessagesBatch objects -#[derive(Debug, Default)] +// ============================================================================ +// IggyMessagesBatchSet - Immutable/frozen batches (Arc-backed Bytes, zero-copy) +// ============================================================================ + +/// A container for immutable message batches (frozen, Arc-backed). +/// +/// Used for reads from in-flight buffer during async disk I/O. +/// All slicing operations use `Bytes::slice()` for zero-copy. +#[derive(Debug, Default, Clone)] pub struct IggyMessagesBatchSet { - /// The collection of message containers - batches: Vec<IggyMessagesBatchMut>, - /// Total number of messages across all containers + batches: Vec<IggyMessagesBatch>, count: u32, - /// Total size in bytes across all containers size: u32, } impl IggyMessagesBatchSet { - /// Create a new empty batch pub fn empty() -> Self { Self { batches: Vec::new(), @@ -49,7 +52,6 @@ impl IggyMessagesBatchSet { } } - /// Create a new empty batch set with a specified initial capacity of message containers pub fn with_capacity(capacity: usize) -> Self { Self { batches: Vec::with_capacity(capacity), @@ -58,7 +60,196 @@ impl IggyMessagesBatchSet { } } - /// Create a batch set from an existing vector of IggyMessages + pub fn from_vec(batches: Vec<IggyMessagesBatch>) -> Self { + let count = batches.iter().map(|b| b.count()).sum(); + let size = batches.iter().map(|b| b.size()).sum(); + Self { + batches, + count, + size, + } + } + + pub fn add_batch(&mut self, batch: IggyMessagesBatch) { + self.count += batch.count(); + self.size += batch.size(); + self.batches.push(batch); + } + + pub fn count(&self) -> u32 { + self.count + } + + pub fn size(&self) -> u32 { + self.size + } + + pub fn containers_count(&self) -> usize { + self.batches.len() + } + + pub fn is_empty(&self) -> bool { + self.batches.is_empty() || self.count == 0 + } + + pub fn first_offset(&self) -> Option<u64> { + self.batches.first().and_then(|b| b.first_offset()) + } + + pub fn last_offset(&self) -> Option<u64> { + self.batches.last().and_then(|b| b.last_offset()) + } + + pub fn first_timestamp(&self) -> Option<u64> { + self.batches.first().and_then(|b| b.first_timestamp()) + } + + pub fn last_timestamp(&self) -> Option<u64> { + self.batches.last().and_then(|b| b.last_timestamp()) + } + + pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatch> { + self.batches.iter() + } + + pub fn into_inner(self) -> Vec<IggyMessagesBatch> { + self.batches + } + + /// Zero-copy filtering by offset using `Bytes::slice()`. + pub fn get_by_offset(&self, offset: u64, count: u32) -> Self { + if self.is_empty() || count == 0 { + return Self::empty(); + } + + let end_offset = offset + count as u64 - 1; + let mut result = Self::with_capacity(self.batches.len()); + let mut remaining = count; + + for batch in &self.batches { + if remaining == 0 { + break; + } + + let batch_first = match batch.first_offset() { + Some(o) => o, + None => continue, + }; + let batch_last = match batch.last_offset() { + Some(o) => o, + None => continue, + }; + + // Skip batches entirely before requested range + if batch_last < offset { + continue; + } + + // Stop if batch is entirely after requested range + if batch_first > end_offset { + break; + } + + // Batch overlaps - slice it (zero-copy via Bytes::slice) + if let Some(sliced) = batch.slice_by_offset(offset, remaining) { + remaining = remaining.saturating_sub(sliced.count()); + result.add_batch(sliced); + } + } + + result + } + + /// Convert to mutable batch set (copies data). + pub fn into_mutable(self) -> IggyMessagesBatchSetMut { + let mut result = IggyMessagesBatchSetMut::with_capacity(self.batches.len()); + for batch in self.batches { + let mutable = frozen_to_mutable(&batch); + result.add_batch(mutable); + } + result + } + + pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> PolledMessages { + if self.is_empty() { + return PolledMessages::empty(); + } + + let mut messages = Vec::with_capacity(self.count as usize); + + for batch in &self.batches { + for message in batch.iter() { + let header = message.header().to_header(); + let payload = Bytes::copy_from_slice(message.payload()); + let user_headers = message.user_headers().map(Bytes::copy_from_slice); + let msg = IggyMessage { + header, + payload, + user_headers, + }; + messages.push(msg); + } + } + + trace!( + "Converted frozen batch of {} messages from partition {} with current offset {}", + messages.len(), + poll_metadata.partition_id, + poll_metadata.current_offset + ); + + PolledMessages { + partition_id: poll_metadata.partition_id, + current_offset: poll_metadata.current_offset, + count: messages.len() as u32, + messages, + } + } +} + +impl From<Vec<IggyMessagesBatch>> for IggyMessagesBatchSet { + fn from(batches: Vec<IggyMessagesBatch>) -> Self { + Self::from_vec(batches) + } +} + +impl Sizeable for IggyMessagesBatchSet { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.size as u64) + } +} + +// ============================================================================ +// IggyMessagesBatchSetMut - Mutable batches (PooledBuffer) +// ============================================================================ + +/// A container for mutable message batches (using PooledBuffer). +/// +/// Used for reads from journal and disk. +#[derive(Debug, Default)] +pub struct IggyMessagesBatchSetMut { + batches: Vec<IggyMessagesBatchMut>, + count: u32, + size: u32, +} + +impl IggyMessagesBatchSetMut { + pub fn empty() -> Self { + Self { + batches: Vec::new(), + count: 0, + size: 0, + } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + batches: Vec::with_capacity(capacity), + count: 0, + size: 0, + } + } + pub fn from_vec(messages: Vec<IggyMessagesBatchMut>) -> Self { let mut batch = Self::with_capacity(messages.len()); for msg in messages { @@ -67,46 +258,19 @@ impl IggyMessagesBatchSet { batch } - /// Add another batch of messages to the batch set pub fn add_batch(&mut self, batch: IggyMessagesBatchMut) { self.count += batch.count(); self.size += batch.size(); self.batches.push(batch); } - /// Add another batch set of messages to the batch set - pub fn add_batch_set(&mut self, mut other_batch_set: IggyMessagesBatchSet) { - self.count += other_batch_set.count(); - self.size += other_batch_set.size(); - let other_batches = std::mem::take(&mut other_batch_set.batches); + pub fn add_batch_set(&mut self, mut other: IggyMessagesBatchSetMut) { + self.count += other.count(); + self.size += other.size(); + let other_batches = std::mem::take(&mut other.batches); self.batches.extend(other_batches); } - /// Add immutable batches by copying them into mutable form. - /// - /// This is used when reading from the in-flight buffer (which holds - /// frozen/immutable batches) and needs to convert them for the read path. - pub fn add_immutable_batches(&mut self, batches: &[IggyMessagesBatch]) { - for batch in batches { - let mutable_batch = Self::immutable_to_mutable(batch); - self.add_batch(mutable_batch); - } - } - - /// Convert an immutable IggyMessagesBatch to a mutable IggyMessagesBatchMut. - /// - /// This requires copying the Bytes into a PooledBuffer. - fn immutable_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut { - let count = batch.count(); - let base_position = batch.indexes().base_position(); - let indexes_buffer = PooledBuffer::from(batch.indexes_slice()); - let indexes = IggyIndexesMut::from_bytes(indexes_buffer, base_position); - let messages = PooledBuffer::from(batch.buffer()); - - IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, messages) - } - - /// Extract indexes from all batches in the set pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) { for batch in self.iter() { let indexes = batch.indexes(); @@ -114,87 +278,65 @@ impl IggyMessagesBatchSet { } } - /// Get the total number of messages in the batch pub fn count(&self) -> u32 { self.count } - /// Get the total size of all messages in bytes pub fn size(&self) -> u32 { self.size } - /// Get the number of message containers in the batch pub fn containers_count(&self) -> usize { self.batches.len() } - /// Check if the batch is empty pub fn is_empty(&self) -> bool { self.batches.is_empty() || self.count == 0 } - /// Get timestamp of first message in first batch pub fn first_timestamp(&self) -> Option<u64> { if self.is_empty() { return None; } - self.batches.first().map(|batch| batch.first_timestamp())? + self.batches + .first() + .and_then(|batch| batch.first_timestamp()) } - /// Get offset of first message in first batch pub fn first_offset(&self) -> Option<u64> { if self.is_empty() { return None; } - self.batches.first().map(|batch| batch.first_offset())? + self.batches.first().and_then(|batch| batch.first_offset()) } - /// Get timestamp of last message in last batch pub fn last_timestamp(&self) -> Option<u64> { if self.is_empty() { return None; } - self.batches.last().map(|batch| batch.last_timestamp())? + self.batches.last().and_then(|batch| batch.last_timestamp()) } - /// Get offset of last message in last batch pub fn last_offset(&self) -> Option<u64> { - self.batches.last().map(|batch| batch.last_offset())? + self.batches.last().and_then(|batch| batch.last_offset()) } - /// Get a reference to the underlying vector of message containers pub fn inner(&self) -> &Vec<IggyMessagesBatchMut> { &self.batches } - /// Consume the batch, returning the underlying vector of message containers pub fn into_inner(mut self) -> Vec<IggyMessagesBatchMut> { std::mem::take(&mut self.batches) } - /// Iterate over all message containers in the batch pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatchMut> { self.batches.iter() } - /// Iterate over all mutable message containers in the batch pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut IggyMessagesBatchMut> { self.batches.iter_mut() } - /// Convert this batch and poll metadata into a vector of fully-formed IggyMessage objects - /// - /// This method transforms the internal message views into complete IggyMessage objects - /// that can be returned to clients. It should only be used by server http implementation. - /// - /// # Arguments - /// - /// * `poll_metadata` - Metadata about the partition and current offset - /// - /// # Returns - /// - /// A vector of IggyMessage objects with proper metadata pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> PolledMessages { if self.is_empty() { return PolledMessages::empty(); @@ -231,10 +373,6 @@ impl IggyMessagesBatchSet { } } - /// Returns a new IggyMessagesBatch containing only messages with offsets greater than or equal to the specified offset, - /// up to the specified count. - /// - /// If no messages match the criteria, returns an empty batch. pub fn get_by_offset(&self, start_offset: u64, count: u32) -> Self { if self.is_empty() || count == 0 { return Self::empty(); @@ -266,10 +404,6 @@ impl IggyMessagesBatchSet { result } - /// Returns a new IggyMessagesBatch containing only messages with timestamps greater than or equal - /// to the specified timestamp, up to the specified count. - /// - /// If no messages match the criteria, returns an empty batch. pub fn get_by_timestamp(&self, timestamp: u64, count: u32) -> Self { if self.is_empty() || count == 0 { return Self::empty(); @@ -299,8 +433,6 @@ impl IggyMessagesBatchSet { result } - /// Get the message at the specified index. - /// Returns None if the index is out of bounds. pub fn get(&self, index: usize) -> Option<IggyMessageView<'_>> { if index >= self.count as usize { return None; @@ -323,14 +455,9 @@ impl IggyMessagesBatchSet { } } -impl Index<usize> for IggyMessagesBatchSet { +impl Index<usize> for IggyMessagesBatchSetMut { type Output = [u8]; - /// Get the message bytes at the specified index across all batches - /// - /// # Panics - /// - /// Panics if the index is out of bounds (>= total number of messages) fn index(&self, index: usize) -> &Self::Output { if index >= self.count as usize { panic!( @@ -356,20 +483,126 @@ impl Index<usize> for IggyMessagesBatchSet { } } -impl Sizeable for IggyMessagesBatchSet { +impl Sizeable for IggyMessagesBatchSetMut { fn get_size_bytes(&self) -> IggyByteSize { IggyByteSize::from(self.size as u64) } } -impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSet { +impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSetMut { fn from(messages: Vec<IggyMessagesBatchMut>) -> Self { Self::from_vec(messages) } } -impl From<IggyMessagesBatchMut> for IggyMessagesBatchSet { +impl From<IggyMessagesBatchMut> for IggyMessagesBatchSetMut { fn from(messages: IggyMessagesBatchMut) -> Self { Self::from_vec(vec![messages]) } } + +// ============================================================================ +// PolledBatches - Result of polling (either mutable or frozen) +// ============================================================================ + +/// Result of polling messages - either mutable or frozen batches. +/// +/// Using an enum avoids copying when reading from the in-flight buffer, +/// as the frozen `Bytes` are Arc-backed and use zero-copy slicing. +#[derive(Debug)] +pub enum PolledBatches { + /// Mutable batches from journal or disk + Mutable(IggyMessagesBatchSetMut), + /// Frozen batches from in-flight buffer (Arc-backed, zero-copy) + Frozen(IggyMessagesBatchSet), +} + +impl PolledBatches { + pub fn empty() -> Self { + Self::Mutable(IggyMessagesBatchSetMut::empty()) + } + + pub fn count(&self) -> u32 { + match self { + Self::Mutable(set) => set.count(), + Self::Frozen(set) => set.count(), + } + } + + pub fn size(&self) -> u32 { + match self { + Self::Mutable(set) => set.size(), + Self::Frozen(set) => set.size(), + } + } + + pub fn is_empty(&self) -> bool { + match self { + Self::Mutable(set) => set.is_empty(), + Self::Frozen(set) => set.is_empty(), + } + } + + pub fn containers_count(&self) -> usize { + match self { + Self::Mutable(set) => set.containers_count(), + Self::Frozen(set) => set.containers_count(), + } + } + + pub fn last_offset(&self) -> Option<u64> { + match self { + Self::Mutable(set) => set.last_offset(), + Self::Frozen(set) => set.last_offset(), + } + } + + pub fn first_offset(&self) -> Option<u64> { + match self { + Self::Mutable(set) => set.first_offset(), + Self::Frozen(set) => set.first_offset(), + } + } + + /// Filter batches by offset. Zero-copy for frozen batches. + pub fn filter_by_offset(self, offset: u64, count: u32) -> Self { + match self { + Self::Mutable(set) => Self::Mutable(set.get_by_offset(offset, count)), + Self::Frozen(set) => Self::Frozen(set.get_by_offset(offset, count)), + } + } + + /// Convert to mutable batch set (copies frozen data). + pub fn into_mutable(self) -> IggyMessagesBatchSetMut { + match self { + Self::Mutable(set) => set, + Self::Frozen(set) => set.into_mutable(), + } + } + + pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> PolledMessages { + match self { + Self::Mutable(set) => set.into_polled_messages(poll_metadata), + Self::Frozen(set) => set.into_polled_messages(poll_metadata), + } + } +} + +impl Default for PolledBatches { + fn default() -> Self { + Self::empty() + } +} + +// ============================================================================ +// Helper functions +// ============================================================================ + +fn frozen_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut { + let count = batch.count(); + let base_position = batch.indexes().base_position(); + let indexes_buffer = PooledBuffer::from(batch.indexes_slice()); + let indexes = IggyIndexesMut::from_bytes(indexes_buffer, base_position); + let messages = PooledBuffer::from(batch.buffer()); + IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, messages) +} diff --git a/core/server/src/streaming/segments/types/mod.rs b/core/server/src/streaming/segments/types/mod.rs index 9f5266eed..5d83b37b8 100644 --- a/core/server/src/streaming/segments/types/mod.rs +++ b/core/server/src/streaming/segments/types/mod.rs @@ -25,3 +25,5 @@ pub use message_header_view_mut::IggyMessageHeaderViewMut; pub use message_view_mut::IggyMessageViewMut; pub use messages_batch_mut::IggyMessagesBatchMut; pub use messages_batch_set::IggyMessagesBatchSet; +pub use messages_batch_set::IggyMessagesBatchSetMut; +pub use messages_batch_set::PolledBatches;
