This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 13dee5d7affdb8910d3f0816ec9c95e60f2ce3f9 Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 11 23:23:37 2025 +0100 more improvements --- bench/src/actors/consumer.rs | 2 +- configs/server.toml | 7 + integration/tests/streaming/get_by_offset.rs | 20 +-- sdk/src/binary/messages.rs | 9 +- sdk/src/messages/send_messages.rs | 4 +- sdk/src/models/messaging/message.rs | 51 +++++++ sdk/src/models/messaging/message_view.rs | 2 +- sdk/src/models/messaging/messages.rs | 148 --------------------- sdk/src/models/messaging/mod.rs | 5 +- sdk/src/prelude.rs | 2 +- .../handlers/messages/poll_messages_handler.rs | 15 ++- .../handlers/messages/send_messages_handler.rs | 4 +- server/src/streaming/partitions/messages.rs | 30 ++--- server/src/streaming/partitions/storage.rs | 7 +- .../src/streaming/segments/indexes/index_reader.rs | 4 +- .../src/streaming/segments/indexes/index_writer.rs | 4 +- server/src/streaming/segments/indexes/mod.rs | 4 +- .../streaming/segments/messages/messages_reader.rs | 21 +-- .../streaming/segments/messages/messages_writer.rs | 49 ++++--- server/src/streaming/segments/messages/mod.rs | 46 +++---- .../streaming/segments/messages/persister_task.rs | 131 +++++++++--------- .../src/streaming/segments/messages_accumulator.rs | 107 +++------------ server/src/streaming/segments/mod.rs | 3 +- server/src/streaming/segments/reading_messages.rs | 105 ++++++--------- server/src/streaming/segments/segment.rs | 57 ++++---- .../streaming/segments/types/message_view_mut.rs | 70 +++++----- server/src/streaming/segments/types/messages.rs | 57 ++++++++ .../src/streaming/segments/types/messages_batch.rs | 109 +++++++++++++++ .../src/streaming/segments/types/messages_mut.rs | 129 ++++++++---------- .../src/streaming/segments/types/messages_slice.rs | 70 ---------- .../segments/types/messages_slice_better.rs | 1 - server/src/streaming/segments/types/mod.rs | 7 +- server/src/streaming/segments/writing_messages.rs | 67 +++------- server/src/streaming/systems/messages.rs | 6 +- server/src/streaming/topics/messages.rs | 122 +---------------- server/src/streaming/topics/storage.rs | 6 - 36 files changed, 608 insertions(+), 873 deletions(-) diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs index e83b34d8..0c451009 100644 --- a/bench/src/actors/consumer.rs +++ b/bench/src/actors/consumer.rs @@ -27,7 +27,7 @@ pub struct Consumer { client_factory: Arc<dyn ClientFactory>, benchmark_kind: BenchmarkKind, consumer_id: u32, - consumer_group_id: Option<u32>, + consumer_group_id: Option<u32>, stream_id: u32, messages_per_batch: u32, message_batches: u32, diff --git a/configs/server.toml b/configs/server.toml index 470c64bb..076aea1b 100644 --- a/configs/server.toml +++ b/configs/server.toml @@ -447,8 +447,15 @@ validate_checksum = false # The threshold of buffered messages before triggering a save to disk (integer). # Specifies how many messages accumulate before persisting to storage. # Adjusting this can balance between write performance and data durability. +# Together with `size_of_messages_required_to_save` it defines the threshold of buffered messages. messages_required_to_save = 1000 +# The threshold of buffered messages size before triggering a save to disk (string). +# Specifies how much messages accumulate before persisting to storage. +# Adjusting this can balance between write performance and data durability. +# Together with `messages_required_to_save` it defines the threshold of buffered messages. +size_of_messages_required_to_save = "1 MB" + # Segment configuration [system.segment] # Defines the soft limit for the size of a storage segment. diff --git a/integration/tests/streaming/get_by_offset.rs b/integration/tests/streaming/get_by_offset.rs index bfaec404..5c1f8cb7 100644 --- a/integration/tests/streaming/get_by_offset.rs +++ b/integration/tests/streaming/get_by_offset.rs @@ -168,11 +168,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - all_loaded_messages.count(), + all_loaded_messages.messages_count(), total_messages, "Expected {} messages from start, but got {}", total_messages, - all_loaded_messages.count() + all_loaded_messages.messages_count() ); // Test 2: Get messages from middle (after 3rd batch) @@ -183,11 +183,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - middle_messages.count(), + middle_messages.messages_count(), remaining_messages, "Expected {} messages from middle offset, but got {}", remaining_messages, - middle_messages.count() + middle_messages.messages_count() ); // Test 3: No messages beyond final offset @@ -197,10 +197,10 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - no_messages.count(), + no_messages.messages_count(), 0, "Expected no messages beyond final offset, but got {}", - no_messages.count() + no_messages.messages_count() ); // Test 4: Small subset from start @@ -210,11 +210,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - subset_messages.count(), + subset_messages.messages_count(), subset_size, "Expected {} messages in subset from start, but got {}", subset_size, - subset_messages.count() + subset_messages.messages_count() ); // Test 5: Messages spanning multiple batches @@ -225,11 +225,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - messages.count(), + messages.messages_count(), span_size, "Expected {} messages spanning multiple batches, but got {}", span_size, - messages.count() + messages.messages_count() ); // Test 6: Validate message content and ordering diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs index cb1d66ae..b9d9709b 100644 --- a/sdk/src/binary/messages.rs +++ b/sdk/src/binary/messages.rs @@ -7,7 +7,7 @@ use crate::error::IggyError; use crate::identifier::Identifier; use crate::messages::{FlushUnsavedBuffer, PollingStrategy}; use crate::models::messaging::IggyMessage; -use crate::prelude::{BytesSerializable, IggyMessages, Partitioning, PollMessages, SendMessages}; +use crate::prelude::{Partitioning, PollMessages, SendMessages}; #[async_trait::async_trait] impl<B: BinaryClient> MessageClient for B { @@ -36,7 +36,12 @@ impl<B: BinaryClient> MessageClient for B { ), ) .await?; - Ok(IggyMessages::from_bytes(response).unwrap().to_messages()) + + // First 4 bytes contain count of received messages + let count = u32::from_le_bytes(response.slice(0..4).as_ref().try_into().unwrap()); + let messages = response.slice(4..); + + Ok(IggyMessage::from_raw_bytes(messages, count)) } async fn send_messages( diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 04dd294c..30ecfaf1 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -5,8 +5,8 @@ use crate::error::IggyError; use crate::identifier::Identifier; use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE}; use crate::models::messaging::{HeaderKey, HeaderValue}; -use crate::models::messaging::{IggyMessage, IggyMessageHeader, IggyMessageViewIterator}; -use crate::prelude::IGGY_MESSAGE_HEADER_SIZE; +use crate::models::messaging::{IggyMessage, IggyMessageHeader}; +use crate::prelude::{IggyMessageViewIterator, IGGY_MESSAGE_HEADER_SIZE}; use crate::utils::byte_size::IggyByteSize; use crate::utils::sizeable::Sizeable; use crate::utils::varint::IggyVarInt; diff --git a/sdk/src/models/messaging/message.rs b/sdk/src/models/messaging/message.rs index d60344fe..e85fb13e 100644 --- a/sdk/src/models/messaging/message.rs +++ b/sdk/src/models/messaging/message.rs @@ -37,6 +37,57 @@ impl IggyMessage { pub fn builder() -> IggyMessageBuilder { IggyMessageBuilder::new() } + + /// Convert bytes to messages + pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> Vec<IggyMessage> { + let mut messages = Vec::with_capacity(count as usize); + let mut position = 0; + let buf_len = buffer.len(); + + while position < buf_len { + if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { + break; + } + let header_bytes = buffer.slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); + let header = match IggyMessageHeader::from_bytes(header_bytes) { + Ok(h) => h, + Err(_) => break, + }; + position += IGGY_MESSAGE_HEADER_SIZE as usize; + + let payload_end = position + header.payload_length as usize; + if payload_end > buf_len { + break; + } + let payload = buffer.slice(position..payload_end); + position = payload_end; + + let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> = + if header.headers_length > 0 { + let headers_end = position + header.headers_length as usize; + if headers_end > buf_len { + break; + } + let headers_bytes = buffer.slice(position..headers_end); + position = headers_end; + + match HashMap::from_bytes(headers_bytes) { + Ok(map) => Some(map), + Err(_) => break, + } + } else { + None + }; + + messages.push(IggyMessage { + header, + payload, + headers, + }); + } + + messages + } } impl FromStr for IggyMessage { diff --git a/sdk/src/models/messaging/message_view.rs b/sdk/src/models/messaging/message_view.rs index 40b20f77..ba51d1f4 100644 --- a/sdk/src/models/messaging/message_view.rs +++ b/sdk/src/models/messaging/message_view.rs @@ -1,5 +1,5 @@ use super::message_header::*; -use super::IggyMessageHeaderView; +use super::message_header_view::IggyMessageHeaderView; use crate::error::IggyError; use crate::models::messaging::header::HeaderKey; use crate::models::messaging::header::HeaderValue; diff --git a/sdk/src/models/messaging/messages.rs b/sdk/src/models/messaging/messages.rs deleted file mode 100644 index cb2f4a54..00000000 --- a/sdk/src/models/messaging/messages.rs +++ /dev/null @@ -1,148 +0,0 @@ -use std::collections::HashMap; - -use super::{ - message_view::IggyMessageViewIterator, IggyMessage, IggyMessageHeader, IGGY_MESSAGE_HEADER_SIZE, -}; -use crate::bytes_serializable::BytesSerializable; -use crate::error::IggyError; -use crate::utils::byte_size::IggyByteSize; -use crate::utils::sizeable::Sizeable; -use bytes::{Bytes, BytesMut}; -use serde::{Deserialize, Serialize}; - -/// An immutable messages container that holds a buffer of messages -#[derive(Debug, Serialize, Deserialize, PartialEq)] -pub struct IggyMessages { - /// The number of messages in the buffer - #[serde(skip)] - count: u32, - /// The buffer containing the messages - buffer: Bytes, -} - -impl IggyMessages { - /// Create a new messages container from a buffer - pub fn new(buffer: Bytes, count: u32) -> Self { - Self { buffer, count } - } - - /// Creates a empty messages container with capacity - pub fn with_capacity(capacity: u32) -> Self { - Self::new(BytesMut::with_capacity(capacity as usize).freeze(), 0) - } - - /// Create iterator over messages - pub fn iter(&self) -> IggyMessageViewIterator { - IggyMessageViewIterator::new(&self.buffer) - } - - /// Get the number of messages - pub fn count(&self) -> u32 { - self.count - } - - /// Get the total size of all messages in bytes - pub fn size(&self) -> u32 { - self.buffer.len() as u32 - } - - /// Get access to the underlying buffer - pub fn buffer(&self) -> &[u8] { - &self.buffer - } - - /// Get access to the underlying buffer shallow copy - pub fn shallow_copy(&self) -> Bytes { - self.buffer.clone() - } - - pub fn into_inner(self) -> Bytes { - self.buffer - } - - pub fn to_messages(self) -> Vec<IggyMessage> { - let mut messages = Vec::with_capacity(self.count as usize); - let mut position = 0; - let buf_len = self.buffer.len(); - - while position < buf_len { - if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { - break; - } - let header_bytes = self - .buffer - .slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); - let header = match IggyMessageHeader::from_bytes(header_bytes) { - Ok(h) => h, - Err(_) => break, - }; - position += IGGY_MESSAGE_HEADER_SIZE as usize; - - let payload_end = position + header.payload_length as usize; - if payload_end > buf_len { - break; - } - let payload = self.buffer.slice(position..payload_end); - position = payload_end; - - let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> = if header.headers_length > 0 { - let headers_end = position + header.headers_length as usize; - if headers_end > buf_len { - break; - } - let headers_bytes = self.buffer.slice(position..headers_end); - position = headers_end; - - match HashMap::from_bytes(headers_bytes) { - Ok(map) => Some(map), - Err(_) => break, - } - } else { - None - }; - - messages.push(IggyMessage { - header, - payload, - headers, - }); - } - - messages - } -} - -impl BytesSerializable for IggyMessages { - fn to_bytes(&self) -> Bytes { - self.buffer.clone() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - let mut messages_count = 0; - let iterator: IggyMessageViewIterator<'_> = IggyMessageViewIterator::new(&bytes); - - for _ in iterator { - messages_count += 1; - } - - Ok(Self { - buffer: bytes, - count: messages_count, - }) - } -} - -impl Sizeable for IggyMessages { - fn get_size_bytes(&self) -> IggyByteSize { - IggyByteSize::from(self.buffer.len() as u64) - } -} - -impl Default for IggyMessages { - fn default() -> Self { - Self::new(BytesMut::new().freeze(), 0) - } -} diff --git a/sdk/src/models/messaging/mod.rs b/sdk/src/models/messaging/mod.rs index 2605a4af..df0fa89c 100644 --- a/sdk/src/models/messaging/mod.rs +++ b/sdk/src/models/messaging/mod.rs @@ -3,7 +3,6 @@ mod message; mod message_header; mod message_header_view; mod message_view; -mod messages; pub use header::{HeaderKey, HeaderKind, HeaderValue}; pub use message::IggyMessage; @@ -15,5 +14,5 @@ pub use message_header::{ IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE, }; pub use message_header_view::IggyMessageHeaderView; -pub use message_view::{IggyMessageView, IggyMessageViewIterator}; -pub use messages::IggyMessages; +pub use message_view::IggyMessageView; +pub use message_view::IggyMessageViewIterator; diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs index 9172ead2..ef4354a7 100644 --- a/sdk/src/prelude.rs +++ b/sdk/src/prelude.rs @@ -20,7 +20,7 @@ pub use crate::messages::{ }; pub use crate::models::messaging::{ HeaderKey, HeaderValue, IggyMessage, IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, - IggyMessageViewIterator, IggyMessages, + IggyMessageViewIterator, }; pub use crate::models::messaging::{ IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs b/server/src/binary/handlers/messages/poll_messages_handler.rs index 85527060..c8a3bfde 100644 --- a/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -26,7 +26,7 @@ impl ServerCommandHandler for PollMessages { debug!("session: {session}, command: {self}"); let system = system.read().await; - let messages = system + let batches = system .poll_messages( session, &self.consumer, @@ -40,16 +40,23 @@ impl ServerCommandHandler for PollMessages { "{COMPONENT} (error: {error}) - failed to poll messages for consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: {session}.", self.consumer, self.stream_id, self.topic_id, self.partition_id ))?; + drop(system); // Collect all chunks first into a Vec to extend their lifetimes. // This ensures the Arc<[u8]> references from each ByteSliceView stay alive // throughout the async vectored I/O operation, preventing "borrowed value does not live // long enough" errors while optimizing transmission by using larger chunks. - let length = messages.size().to_le_bytes(); - let slices: Vec<IoSlice> = messages.chunks().map(IoSlice::new).collect(); + let response_length = (batches.size() + 4).to_le_bytes(); + let messages_count = batches.messages_count().to_le_bytes(); - sender.send_ok_response_vectored(&length, slices).await?; + let mut io_slices = Vec::with_capacity(batches.containers_count() + 1); + io_slices.push(IoSlice::new(&messages_count)); + io_slices.extend(batches.iter().map(|m| IoSlice::new(m))); + + sender + .send_ok_response_vectored(&response_length, io_slices) + .await?; Ok(()) } } diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs index 60d9735a..aac97eb8 100644 --- a/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/server/src/binary/handlers/messages/send_messages_handler.rs @@ -53,8 +53,8 @@ impl ServerCommandHandler for SendMessages { let messages_count = buffer.get_u32_le(); let messages = buffer.split(); - let mut messages = IggyMessagesMut::new(messages); - messages.set_count(messages_count); + + let messages = IggyMessagesMut::from_bytes(messages, messages_count); let system = system.read().await; system diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index d4e6fc1f..51819388 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -66,7 +66,7 @@ impl Partition { &self, start_offset: u64, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessagesBatch, IggyError> { trace!( "Getting messages for start offset: {start_offset} for partition: {}, current offset: {}...", self.partition_id, @@ -80,6 +80,7 @@ impl Partition { */ let end_offset = self.get_end_offset(start_offset, count); + // TODO: Most likely don't need to find the specific range of segments, just find the first segment containing the first offset // and during reads roll to the next one, when the first is exhausted. let segments = self.filter_segments_by_offsets(start_offset, end_offset); @@ -93,12 +94,12 @@ impl Partition { } // Retrieves the first messages (up to a specified count). - pub async fn get_first_messages(&self, count: u32) -> Result<IggyMessagesSlice, IggyError> { + pub async fn get_first_messages(&self, count: u32) -> Result<IggyMessagesBatch, IggyError> { self.get_messages_by_offset(0, count).await } // Retrieves the last messages (up to a specified count). - pub async fn get_last_messages(&self, count: u32) -> Result<IggyMessagesSlice, IggyError> { + pub async fn get_last_messages(&self, count: u32) -> Result<IggyMessagesBatch, IggyError> { let mut requested_count = count as u64; if requested_count > self.current_offset + 1 { requested_count = self.current_offset + 1 @@ -113,7 +114,7 @@ impl Partition { &self, consumer: PollingConsumer, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessagesBatch, IggyError> { let (consumer_offsets, consumer_id) = match consumer { PollingConsumer::Consumer(consumer_id, _) => (&self.consumer_offsets, consumer_id), PollingConsumer::ConsumerGroup(group_id, _) => (&self.consumer_group_offsets, group_id), @@ -181,11 +182,10 @@ impl Partition { segments: Vec<&Segment>, offset: u64, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { - let mut slices = Vec::new(); + ) -> Result<IggyMessagesBatch, IggyError> { let mut remaining_count = count; let mut current_offset = offset; - + let mut batches = IggyMessagesBatch::new(); for segment in segments { if remaining_count == 0 { break; @@ -203,7 +203,7 @@ impl Partition { })?; // Update remaining count and offset for next segment - let messages_count = messages.count(); + let messages_count = messages.messages_count(); remaining_count = remaining_count.saturating_sub(messages_count); // Update the offset for the next segment if needed @@ -213,12 +213,10 @@ impl Partition { current_offset += messages_count as u64; } - slices.push(messages); + batches.add_batch(messages); } - // Combine all segment slices into a single composite slice - // This avoids copying the data - Ok(IggyMessagesSlice::combine(slices)) + Ok(batches) // let mut results = Vec::new(); // let mut remaining_count = count; // for segment in segments { @@ -252,9 +250,9 @@ impl Partition { ) -> Result<(), IggyError> { trace!( "Appending {} messages of size {} to partition with ID: {}...", - self.partition_id, messages.count(), - messages.size() + messages.size(), + self.partition_id ); { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; @@ -373,9 +371,7 @@ impl Partition { self.partition_id ); - // Make sure all of the messages from the accumulator are persisted - // no leftover from one round trip. - while last_segment.unsaved_messages.is_some() { + if !last_segment.accumulator.is_empty() { last_segment.persist_messages(None).await.unwrap(); } self.unsaved_messages_count = 0; diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index a251be85..573c9937 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -133,9 +133,10 @@ impl PartitionStorage for FilePartitionStorage { format!("{COMPONENT} (error: {error}) - failed to load segment: {segment}",) })?; let capacity = partition.config.partition.messages_required_to_save; - if !segment.is_closed { - segment.unsaved_messages = Some(Default::default()); - } + // TODO(hubcio) + // if !segment.is_closed { + // segment.unsaved_messages = Some(Default::default()); + // } // If the first segment has at least a single message, we should increment the offset. if !partition.should_increment_offset { diff --git a/server/src/streaming/segments/indexes/index_reader.rs b/server/src/streaming/segments/indexes/index_reader.rs index 83a8eb3b..87d79ad3 100644 --- a/server/src/streaming/segments/indexes/index_reader.rs +++ b/server/src/streaming/segments/indexes/index_reader.rs @@ -15,13 +15,13 @@ use tracing::{error, trace}; /// A dedicated struct for reading from the index file. #[derive(Debug)] -pub struct SegmentIndexReader { +pub struct IndexReader { file_path: String, file: Arc<File>, index_size_bytes: Arc<AtomicU64>, } -impl SegmentIndexReader { +impl IndexReader { /// Opens the index file in read-only mode. pub async fn new(file_path: &str, index_size_bytes: Arc<AtomicU64>) -> Result<Self, IggyError> { let file = OpenOptions::new() diff --git a/server/src/streaming/segments/indexes/index_writer.rs b/server/src/streaming/segments/indexes/index_writer.rs index 306e7350..7d6df825 100644 --- a/server/src/streaming/segments/indexes/index_writer.rs +++ b/server/src/streaming/segments/indexes/index_writer.rs @@ -13,14 +13,14 @@ use tracing::trace; /// A dedicated struct for writing to the index file. #[derive(Debug)] -pub struct SegmentIndexWriter { +pub struct IndexWriter { file_path: String, file: File, index_size_bytes: Arc<AtomicU64>, fsync: bool, } -impl SegmentIndexWriter { +impl IndexWriter { /// Opens the index file in write mode. pub async fn new( file_path: &str, diff --git a/server/src/streaming/segments/indexes/mod.rs b/server/src/streaming/segments/indexes/mod.rs index 5ad84180..2a9308c5 100644 --- a/server/src/streaming/segments/indexes/mod.rs +++ b/server/src/streaming/segments/indexes/mod.rs @@ -6,5 +6,5 @@ mod index_writer; pub const INDEX_SIZE: u32 = 16; pub use index::Index; -pub use index_reader::SegmentIndexReader; -pub use index_writer::SegmentIndexWriter; +pub use index_reader::IndexReader; +pub use index_writer::IndexWriter; diff --git a/server/src/streaming/segments/messages/messages_reader.rs b/server/src/streaming/segments/messages/messages_reader.rs index caff84d2..060d4235 100644 --- a/server/src/streaming/segments/messages/messages_reader.rs +++ b/server/src/streaming/segments/messages/messages_reader.rs @@ -1,11 +1,8 @@ +use crate::streaming::segments::IggyMessages; use bytes::{Bytes, BytesMut}; use error_set::ErrContext; -use iggy::{ - error::IggyError, - prelude::{BytesSerializable, IggyMessages}, -}; +use iggy::error::IggyError; use std::{ - fmt, fs::{File, OpenOptions}, os::unix::prelude::FileExt, }; @@ -17,7 +14,7 @@ use std::{ }, }; use tokio::task::spawn_blocking; -use tracing::{error, trace, warn}; +use tracing::{error, trace}; /// A dedicated struct for reading from the log file. #[derive(Debug)] @@ -115,12 +112,16 @@ impl MessagesReader { let file_size = self.file_size(); if file_size == 0 { trace!("Messages file {} is empty.", self.file_path); - return Ok(IggyMessages::default()); + return Ok(IggyMessages::empty()); } - let messages_bytes = match self.read_at(start_pos as u64, count_bytes as u64).await { + + let messages_bytes = match self + .read_bytes_at(start_pos as u64, count_bytes as u64) + .await + { Ok(buf) => buf, Err(error) if error.kind() == ErrorKind::UnexpectedEof => { - return Ok(IggyMessages::default()) + return Ok(IggyMessages::empty()) } Err(error) => { error!( @@ -138,7 +139,7 @@ impl MessagesReader { self.log_size_bytes.load(Ordering::Acquire) } - async fn read_at(&self, offset: u64, len: u64) -> Result<Bytes, std::io::Error> { + async fn read_bytes_at(&self, offset: u64, len: u64) -> Result<Bytes, std::io::Error> { let file = self.file.clone(); spawn_blocking(move || { let mut buf = BytesMut::with_capacity(len as usize); diff --git a/server/src/streaming/segments/messages/messages_writer.rs b/server/src/streaming/segments/messages/messages_writer.rs index bc46040b..5fff02fe 100644 --- a/server/src/streaming/segments/messages/messages_writer.rs +++ b/server/src/streaming/segments/messages/messages_writer.rs @@ -1,22 +1,16 @@ use super::PersisterTask; +use crate::streaming::segments::{messages::write_batch, IggyMessagesBatch}; use error_set::ErrContext; use iggy::{ confirmation::Confirmation, error::IggyError, - prelude::IggyMessages, utils::{byte_size::IggyByteSize, duration::IggyDuration}, }; -use std::{ - io::IoSlice, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, - }, -}; -use tokio::{ - fs::{File, OpenOptions}, - io::AsyncWriteExt, +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, }; +use tokio::fs::{File, OpenOptions}; use tracing::{error, trace}; /// A dedicated struct for writing to the messages file. @@ -94,17 +88,31 @@ impl MessagesWriter { /// Append a messages to the messages file. pub async fn save_batches( &mut self, - messages: Vec<IggyMessages>, + batches: IggyMessagesBatch, confirmation: Confirmation, ) -> Result<IggyByteSize, IggyError> { - let messages_size: usize = messages.iter().map(|m| m.size() as usize).sum(); + let messages_size = batches.size(); trace!( "Saving batch of size {messages_size} bytes to messages file: {}", self.file_path ); match confirmation { Confirmation::Wait => { - self.write_batch_vectored(messages).await?; + if let Some(ref mut file) = self.file { + write_batch(file, &self.file_path, batches) + .await + .with_error_context(|error| { + format!( + "Failed to write batch to messages file: {}. {error}", + self.file_path + ) + }) + .map_err(|_| IggyError::CannotWriteToFile)?; + } else { + error!("File handle is not available for synchronous write."); + return Err(IggyError::CannotWriteToFile); + } + self.messages_size_bytes .fetch_add(messages_size as u64, Ordering::AcqRel); trace!( @@ -117,7 +125,7 @@ impl MessagesWriter { } Confirmation::NoWait => { if let Some(task) = &self.persister_task { - task.persist(messages).await; + task.persist(batches).await; } else { panic!( "Confirmation::NoWait is used, but MessagesPersisterTask is not set for messages file: {}", @@ -130,17 +138,6 @@ impl MessagesWriter { Ok(IggyByteSize::from(messages_size as u64)) } - /// Write a batch of bytes to the log file. - async fn write_batch_vectored(&mut self, batches: Vec<IggyMessages>) -> Result<(), IggyError> { - if let Some(ref mut file) = self.file { - super::write_batch_vectored(file, &self.file_path, batches).await?; - Ok(()) - } else { - error!("File handle is not available for synchronous write."); - Err(IggyError::CannotWriteToFile) - } - } - pub async fn fsync(&self) -> Result<(), IggyError> { if let Some(file) = self.file.as_ref() { file.sync_all() diff --git a/server/src/streaming/segments/messages/mod.rs b/server/src/streaming/segments/messages/mod.rs index e4bff590..eaf74a44 100644 --- a/server/src/streaming/segments/messages/mod.rs +++ b/server/src/streaming/segments/messages/mod.rs @@ -2,48 +2,36 @@ mod messages_reader; mod messages_writer; mod persister_task; +use std::io::IoSlice; + +use error_set::ErrContext; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; pub use persister_task::PersisterTask; -use error_set::ErrContext; -use iggy::{error::IggyError, prelude::IggyMessages}; -use std::io::IoSlice; +use super::IggyMessagesBatch; +use iggy::error::IggyError; use tokio::{fs::File, io::AsyncWriteExt}; -use tracing::error; -/// Write a batch of messages to a file using vectored I/O -/// -/// This function writes all the messages in the batch to the file using vectored I/O, -/// which can be more efficient than writing each message individually. -pub async fn write_batch_vectored( +/// Vectored write a batches of messages to file +async fn write_batch( file: &mut File, file_path: &str, - batches: Vec<IggyMessages>, -) -> Result<u32, IggyError> { - let mut total_bytes_written = 0; - let mut slices_vec: Vec<IoSlice<'_>> = - batches.iter().map(|b| IoSlice::new(b.buffer())).collect(); - - let mut slices = slices_vec.as_mut_slice(); + batches: IggyMessagesBatch, +) -> Result<usize, IggyError> { + let mut slices: Vec<IoSlice> = batches.iter().map(|b| IoSlice::new(b)).collect(); + let slices = &mut slices.as_mut_slice(); + let mut written = 0; while !slices.is_empty() { - let written = file + written += file .write_vectored(slices) .await .with_error_context(|error| { - format!("Failed to write vectored to file: {file_path}. {error}") + format!("Failed to write messages to file: {file_path}, error: {error}",) }) - .map_err(|_| IggyError::CannotWriteToFile)? as u32; - - if written == 0 { - error!("Failed to write batch of messages to file: {file_path}"); - return Err(IggyError::CannotWriteToFile); - } - - total_bytes_written += written; - IoSlice::advance_slices(&mut slices, written as usize); + .map_err(|_| IggyError::CannotWriteToFile)?; + IoSlice::advance_slices(slices, written); } - - Ok(total_bytes_written) + Ok(written) } diff --git a/server/src/streaming/segments/messages/persister_task.rs b/server/src/streaming/segments/messages/persister_task.rs index f76000e8..9072a1e7 100644 --- a/server/src/streaming/segments/messages/persister_task.rs +++ b/server/src/streaming/segments/messages/persister_task.rs @@ -1,5 +1,7 @@ +use crate::streaming::segments::IggyMessagesBatch; +use error_set::ErrContext; use flume::{unbounded, Receiver}; -use iggy::{error::IggyError, prelude::IggyMessages, utils::duration::IggyDuration}; +use iggy::{error::IggyError, utils::duration::IggyDuration}; use std::{ io::IoSlice, sync::{ @@ -11,10 +13,12 @@ use std::{ use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep}; use tracing::{error, trace, warn}; +use super::write_batch; + #[derive(Debug)] /// A command to the persister task. enum PersisterTaskCommand { - WriteRequest(Vec<IggyMessages>), + WriteRequest(IggyMessagesBatch), Shutdown, } @@ -59,7 +63,7 @@ impl PersisterTask { } /// Sends the batch bytes to the persister task (fire-and-forget). - pub async fn persist(&self, messages: Vec<IggyMessages>) { + pub async fn persist(&self, messages: IggyMessagesBatch) { if let Err(e) = self .sender .send_async(PersisterTaskCommand::WriteRequest(messages)) @@ -173,16 +177,7 @@ impl PersisterTask { while let Ok(request) = receiver.recv_async().await { match request { PersisterTaskCommand::WriteRequest(messages) => { - match Self::write_with_retries( - &mut file, - &file_path, - messages, - fsync, - max_retries, - retry_delay, - ) - .await - { + match write_batch(&mut file, &file_path, messages).await { Ok(bytes_written) => { log_file_size.fetch_add(bytes_written as u64, Ordering::AcqRel); } @@ -209,59 +204,61 @@ impl PersisterTask { trace!("PersisterTask for file {file_path} has finished processing requests"); } - /// Writes the provided data to the file using simple retry logic. - async fn write_with_retries( - file: &mut File, - file_path: &str, - messages: Vec<IggyMessages>, - fsync: bool, - max_retries: u32, - retry_delay: IggyDuration, - ) -> Result<u32, IggyError> { - let mut attempts = 0; - // TODO: This "retry" logic should be rewritten. - // There are certain kind of errors whom retrying is considered harmful, for example fsync failure - // (https://www.usenix.org/system/files/atc20-rebello.pdf) - // There are few errors which are worth retrying such as LSE (Latent sector error), as the file system - // might be able to reallocate a new sector for the data and recover, but not every file system supports that. - // In general this topic should be furthered researched rather than just naively retry when the write fails. - super::write_batch_vectored(file, file_path, messages).await + // /// Writes the provided data to the file using simple retry logic. + // async fn write_with_retries( + // file: &mut File, + // file_path: &str, + // batches: IggyMessagesBatch, + // fsync: bool, + // max_retries: u32, + // retry_delay: IggyDuration, + // ) -> Result<u32, IggyError> { + // let mut attempts = 0; + // // TODO: This "retry" logic should be rewritten. + // // There are certain kind of errors whom retrying is considered harmful, for example fsync failure + // // (https://www.usenix.org/system/files/atc20-rebello.pdf) + // // There are few errors which are worth retrying such as LSE (Latent sector error), as the file system + // // might be able to reallocate a new sector for the data and recover, but not every file system supports that. + // // In general this topic should be furthered researched rather than just naively retry when the write fails. + // let mut slices = Vec::new(); + // batches.iter().map(|b| { + // slices.push(IoSlice::new(&b)); + // }); - // let messages_size = messages.iter().map(|m| m.size()).sum(); - // loop { - // // TODO: use vectored API - // match super::write_batch_vectored(file, file_path, messages).await { - // Ok(_) => { - // if fsync { - // match file.sync_all().await { - // Ok(_) => return Ok(messages_size), - // Err(e) => { - // attempts += 1; - // error!( - // "Error syncing file {file_path}: {:?} (attempt {attempts}/{max_retries})", - // e, - // ); - // } - // } - // } else { - // return Ok(messages_size); - // } - // } - // Err(e) => { - // attempts += 1; - // error!( - // "Error writing to file {file_path}: {:?} (attempt {attempts}/{max_retries})", - // e, - // ); - // } - // } - // if attempts >= max_retries { - // error!( - // "Failed to write to file {file_path} after {max_retries} attempts, something's terribly wrong", - // ); - // return Err(IggyError::CannotWriteToFile); - // } - // sleep(retry_delay.get_duration()).await; - // } - } + // let messages_size = batches.size(); + // loop { + // match file.write_vectored(&slices).await { + // Ok(written) => { + // if fsync { + // match file.sync_all().await { + // Ok(_) => return Ok(messages_size), + // Err(e) => { + // attempts += 1; + // error!( + // "Error syncing file {file_path}: {:?} (attempt {attempts}/{max_retries})", + // e, + // ); + // } + // } + // } else { + // return Ok(messages_size); + // } + // } + // Err(e) => { + // attempts += 1; + // error!( + // "Error writing to file {file_path}: {:?} (attempt {attempts}/{max_retries})", + // e, + // ); + // } + // } + // if attempts >= max_retries { + // error!( + // "Failed to write to file {file_path} after {max_retries} attempts, something's terribly wrong", + // ); + // return Err(IggyError::CannotWriteToFile); + // } + // sleep(retry_delay.get_duration()).await; + // } + // } } diff --git a/server/src/streaming/segments/messages_accumulator.rs b/server/src/streaming/segments/messages_accumulator.rs index 45bb2f35..37e1775e 100644 --- a/server/src/streaming/segments/messages_accumulator.rs +++ b/server/src/streaming/segments/messages_accumulator.rs @@ -1,8 +1,7 @@ -use super::{IggyMessagesMut, Index}; -use crate::streaming::segments::types::IggyMessagesSlice; -use iggy::prelude::IggyMessages; +use super::{IggyMessages, IggyMessagesBatch, IggyMessagesMut, Index}; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::timestamp::IggyTimestamp; +use std::mem::take; /// A container that accumulates messages before they are written to disk #[derive(Debug, Default)] @@ -22,11 +21,8 @@ pub struct MessagesAccumulator { /// Current maximum timestamp current_timestamp: u64, - /// A multiple buffers containing all accumulated messages - messages: Vec<IggyMessages>, - - /// Indexes - indexes: Vec<Index>, + /// A buffer containing all accumulated messages + batches: IggyMessagesBatch, /// Number of messages in the accumulator messages_count: u32, @@ -41,45 +37,31 @@ impl MessagesAccumulator { &mut self, current_offset: u64, current_position: u32, - mut batch: IggyMessagesMut, + indexes: &mut Vec<Index>, + messages: IggyMessagesMut, ) -> u32 { - // TODO(hubcio): add capacity - let batch_messages_count = batch.count(); + let batch_messages_count = messages.count(); if batch_messages_count == 0 { return 0; } - if self.messages.is_empty() { + if self.batches.is_empty() { self.base_offset = current_offset; self.base_timestamp = IggyTimestamp::now().as_micros(); self.current_offset = current_offset; self.current_timestamp = self.base_timestamp; } - // TODO(hubcio): pass indexes vec and modify - batch.prepare_for_persistence(self.base_offset, self.current_timestamp); - + let batch = messages.prepare_for_persistence( + self.base_offset, + self.current_timestamp, + current_position, + indexes, + ); let batch_size = batch.size(); - let batch = batch.make_immutable(); - let mut current_position = current_position; - for message in batch.iter() { - let msg_size = message.size() as u32; - - let offset = message.msg_header().offset(); - let relative_offset = (offset - self.base_offset) as u32; - let position = current_position; - let timestamp = message.msg_header().timestamp(); - - self.indexes.push(Index { - offset: relative_offset, - position, - timestamp, - }); - - current_position += msg_size; - } - self.messages.push(batch); + + self.batches.add(batch); self.messages_count += batch_messages_count; self.current_size += IggyByteSize::from(batch_size as u64); @@ -88,61 +70,14 @@ impl MessagesAccumulator { batch_messages_count } - pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) -> IggyMessagesSlice { - let start_idx = (start_offset - self.base_offset) as usize; - let end_idx = (end_offset - self.base_offset) as usize; // inclusive - if self.indexes.is_empty() || start_idx >= self.indexes.len() { - return IggyMessagesSlice::empty(); - } - let start_byte = self.indexes[start_idx].position as usize; - let end_byte = if end_idx + 1 < self.indexes.len() { - self.indexes[end_idx + 1].position as usize - } else { - self.current_size.as_bytes_usize() - }; - - let mut cumulative = Vec::with_capacity(self.messages.len()); - let mut sum = 0usize; - for msg in &self.messages { - sum += msg.buffer().len(); - cumulative.push(sum); - } - - let mut buffers = Vec::new(); - for (i, &cum) in cumulative.iter().enumerate() { - let batch_start = if i == 0 { 0 } else { cumulative[i - 1] }; - let batch_end = cum; - if batch_end <= start_byte { - continue; - } - if batch_start >= end_byte { - break; - } - let slice_start = start_byte.saturating_sub(batch_start); - let slice_end = if end_byte < batch_end { - end_byte - batch_start - } else { - batch_end - batch_start - }; - buffers.push( - self.messages[i] - .shallow_copy() - .slice(slice_start..slice_end), - ); - } - - let count = if end_idx >= start_idx { - (end_idx - start_idx + 1) as u32 - } else { - 0 - }; - IggyMessagesSlice::from_buffers(buffers, count) + pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) -> IggyMessages { + todo!(); } /// Checks if the accumulator is empty pub fn is_empty(&self) -> bool { - self.messages.is_empty() || self.messages_count == 0 + self.batches.is_empty() || self.messages_count == 0 } /// Returns the number of unsaved messages @@ -171,7 +106,7 @@ impl MessagesAccumulator { } /// Takes ownership of the accumulator and returns the messages and indexes - pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) { - (self.messages, self.indexes) + pub fn materialize(self) -> IggyMessagesBatch { + self.batches } } diff --git a/server/src/streaming/segments/mod.rs b/server/src/streaming/segments/mod.rs index 524d9507..2ea22d04 100644 --- a/server/src/streaming/segments/mod.rs +++ b/server/src/streaming/segments/mod.rs @@ -10,8 +10,9 @@ pub use indexes::Index; pub use segment::Segment; pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut; +pub use types::IggyMessages; +pub use types::IggyMessagesBatch; pub use types::IggyMessagesMut; -pub use types::IggyMessagesSlice; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; diff --git a/server/src/streaming/segments/reading_messages.rs b/server/src/streaming/segments/reading_messages.rs index e680fec4..c230e62c 100644 --- a/server/src/streaming/segments/reading_messages.rs +++ b/server/src/streaming/segments/reading_messages.rs @@ -1,10 +1,7 @@ -use super::Index; +use super::{IggyMessages, IggyMessagesBatch, Index}; use crate::streaming::segments::segment::Segment; -use crate::streaming::segments::types::IggyMessagesSlice; -use bytes::{Bytes, BytesMut}; use error_set::ErrContext; use iggy::prelude::*; -use std::sync::Arc; use tracing::trace; const COMPONENT: &str = "STREAMING_SEGMENT"; @@ -22,31 +19,33 @@ impl Segment { &self, start_timestamp: u64, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { - if count == 0 { - return Ok(IggyMessagesSlice::empty()); - } + ) -> Result<IggyMessagesBatch, IggyError> { + todo!() + + // if count == 0 { + // return Ok(IggyMessagesMut::empty()); + // } - let index_opt = self.load_index_for_timestamp(start_timestamp).await?; + // let index_opt = self.load_index_for_timestamp(start_timestamp).await?; - let Some(index) = index_opt else { - trace!("No messages found for timestamp: {}", start_timestamp); - return Ok(IggyMessagesSlice::empty()); - }; + // let Some(index) = index_opt else { + // trace!("No messages found for timestamp: {}", start_timestamp); + // return Ok(IggyMessagesMut::empty()); + // }; - let offset = self.start_offset + index.offset as u64; - trace!("Found offset {} for timestamp {}", offset, start_timestamp); + // let offset = self.start_offset + index.offset as u64; + // trace!("Found offset {} for timestamp {}", offset, start_timestamp); - self.get_messages_by_offset(offset, count).await + // self.get_messages_by_offset(offset, count).await } pub async fn get_messages_by_offset( &self, mut offset: u64, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessagesBatch, IggyError> { if count == 0 { - return Ok(IggyMessagesSlice::empty()); + return Ok(IggyMessagesBatch::default()); } if offset < self.start_offset { @@ -62,59 +61,49 @@ impl Segment { offset = self.start_offset; } - // In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer - if self.unsaved_messages.is_none() { - return Ok(self.load_messages_from_disk(offset, count).await?); + if self.accumulator.is_empty() { + return Ok(IggyMessagesBatch::from( + self.load_messages_from_disk(offset, count).await?, + )); } - let messages_accumulator = self.unsaved_messages.as_ref().unwrap(); - if messages_accumulator.is_empty() { - return Ok(self.load_messages_from_disk(offset, count).await?); - } - - let accumulator_first_msg_offset = messages_accumulator.base_offset(); - let accumulator_last_msg_offset = messages_accumulator.max_offset(); + let accumulator_first_msg_offset = self.accumulator.base_offset(); + let accumulator_last_msg_offset = self.accumulator.max_offset(); // Case 1: All messages are in messages_require_to_save buffer if offset >= accumulator_first_msg_offset && end_offset <= accumulator_last_msg_offset { - return Ok(self.load_messages_from_unsaved_buffer(offset, end_offset)); + return Ok(IggyMessagesBatch::from( + self.accumulator.get_messages_by_offset(offset, end_offset), + )); } // Case 2: All messages are on disk if end_offset < accumulator_first_msg_offset { - return Ok(self.load_messages_from_disk(offset, count).await?); + return Ok(IggyMessagesBatch::from( + self.load_messages_from_disk(offset, count).await?, + )); } // Case 3: Messages span disk and messages_require_to_save buffer boundary - let mut slices = Vec::new(); - // Load messages from disk up to the messages_require_to_save buffer boundary - if offset < accumulator_first_msg_offset { - let disk_messages = self + let disk_messages = self .load_messages_from_disk(offset, (accumulator_first_msg_offset - offset) as u32) .await.with_error_context(|error| format!( "{COMPONENT} (error: {error}) - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end offset :{}", self.stream_id, self.topic_id, self.partition_id, accumulator_first_msg_offset - 1 ))?; - slices.push(disk_messages); - } // Load remaining messages from messages_require_to_save buffer let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset); - let buffer_messages = self.load_messages_from_unsaved_buffer(buffer_start, end_offset); - slices.push(buffer_messages); - - Ok(IggyMessagesSlice::combine(slices)) - } - - fn load_messages_from_unsaved_buffer( - &self, - start_offset: u64, - end_offset: u64, - ) -> IggyMessagesSlice { - let messages_accumulator = self.unsaved_messages.as_ref().unwrap(); - messages_accumulator.get_messages_by_offset(start_offset, end_offset) + let buffer_messages = self + .accumulator + .get_messages_by_offset(buffer_start, end_offset); + + Ok(IggyMessagesBatch::from(vec![ + disk_messages, + buffer_messages, + ])) } pub async fn load_index_for_timestamp( @@ -210,7 +199,7 @@ impl Segment { &self, start_offset: u64, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessages, IggyError> { tracing::trace!( "Loading {count} messages from disk, start_offset: {start_offset}, current_offset: {}...", self.current_offset @@ -227,7 +216,7 @@ impl Segment { .await?; if first_index.is_none() { - return Ok(IggyMessagesSlice::empty()); + return Ok(IggyMessages::empty()); } let first_index = first_index.unwrap(); @@ -243,23 +232,13 @@ impl Segment { let start_pos = first_index.position; let count_bytes = last_index.position - start_pos; - let messages = self - .messages_reader + self.messages_reader .as_ref() .unwrap() .load_messages_impl(start_pos, count_bytes, count) .await .with_error_context(|error| { format!("Failed to load messages from segment file: {self}. {error}") - })?; - let end = messages.size(); - let count = messages.count(); - - Ok(IggyMessagesSlice::new( - messages.into_inner(), - 0, - end as usize, - count, - )) + }) } } diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 30886fc1..dcf5a3f9 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -37,12 +37,12 @@ pub struct Segment { pub is_closed: bool, pub(super) messages_writer: Option<MessagesWriter>, pub(super) messages_reader: Option<MessagesReader>, - pub(super) index_writer: Option<SegmentIndexWriter>, - pub(super) index_reader: Option<SegmentIndexReader>, + pub(super) index_writer: Option<IndexWriter>, + pub(super) index_reader: Option<IndexReader>, pub message_expiry: IggyExpiry, - pub unsaved_messages: Option<MessagesAccumulator>, + pub accumulator: MessagesAccumulator, pub config: Arc<SystemConfig>, - pub indexes: Option<Vec<Index>>, + pub indexes: Vec<Index>, pub(super) log_size_bytes: Arc<AtomicU64>, pub(super) index_size_bytes: Arc<AtomicU64>, } @@ -70,10 +70,10 @@ impl Segment { IggyExpiry::ServerDefault => config.segment.message_expiry, _ => message_expiry, }; - let indexes = match config.segment.cache_indexes { - true => Some(Vec::new()), - false => None, - }; + // let indexes = match config.segment.cache_indexes { + // true => Some(Vec::new()), + // false => None, + // }; Segment { stream_id, @@ -90,8 +90,8 @@ impl Segment { last_index_position: 0, max_size_bytes: config.segment.size, message_expiry, - indexes, - unsaved_messages: None, + indexes: Vec::new(), // TODO add capacity + accumulator: MessagesAccumulator::default(), is_closed: false, messages_writer: None, messages_reader: None, @@ -128,35 +128,30 @@ impl Segment { self.size_bytes = IggyByteSize::from(log_size_bytes); self.last_index_position = log_size_bytes as _; - self.indexes = Some( - self.index_reader - .as_ref() - .unwrap() - .load_all_indexes_impl() - .await - .with_error_context(|error| format!("Failed to load indexes for {self}. {error}")) - .map_err(|_| IggyError::CannotReadFile)?, - ); + self.indexes = self + .index_reader + .as_ref() + .unwrap() + .load_all_indexes_impl() + .await + .with_error_context(|error| format!("Failed to load indexes for {self}. {error}")) + .map_err(|_| IggyError::CannotReadFile)?; - let last_index_offset = if self.indexes.as_ref().unwrap().is_empty() { + let last_index_offset = if self.indexes.is_empty() { 0_u64 } else { - self.indexes.as_ref().unwrap().last().unwrap().offset as u64 + self.indexes.last().unwrap().offset as u64 }; self.current_offset = self.start_offset + last_index_offset; info!("Loaded {} indexes for segment with start offset: {} and partition with ID: {} for topic with ID: {} and stream with ID: {}.", - self.indexes.as_ref().unwrap().len(), + self.indexes.len(), self.start_offset, self.partition_id, self.topic_id, self.stream_id); - if !self.config.segment.cache_indexes { - self.indexes = None; - } - if self.is_full().await { self.is_closed = true; } @@ -221,8 +216,7 @@ impl Segment { .await?; let index_writer = - SegmentIndexWriter::new(&self.index_path, self.index_size_bytes.clone(), index_fsync) - .await?; + IndexWriter::new(&self.index_path, self.index_size_bytes.clone(), index_fsync).await?; self.messages_writer = Some(log_writer); self.index_writer = Some(index_writer); @@ -233,7 +227,7 @@ impl Segment { let log_reader = MessagesReader::new(&self.log_path, self.log_size_bytes.clone()).await?; // TODO(hubcio): there is no need to store open fd for reader if we have index cache enabled let index_reader = - SegmentIndexReader::new(&self.index_path, self.index_size_bytes.clone()).await?; + IndexReader::new(&self.index_path, self.index_size_bytes.clone()).await?; self.messages_reader = Some(log_reader); self.index_reader = Some(index_reader); @@ -434,8 +428,7 @@ mod tests { assert_eq!(segment.log_path, log_path); assert_eq!(segment.index_path, index_path); assert_eq!(segment.message_expiry, message_expiry); - assert!(segment.unsaved_messages.is_none()); - assert!(segment.indexes.is_some()); + assert!(segment.indexes.is_empty()); assert!(!segment.is_closed); assert!(!segment.is_full().await); } @@ -476,6 +469,6 @@ mod tests { messages_count_of_parent_partition, ); - assert!(segment.indexes.is_none()); + assert!(segment.indexes.is_empty()); } } diff --git a/server/src/streaming/segments/types/message_view_mut.rs b/server/src/streaming/segments/types/message_view_mut.rs index e83ae16d..4b363a76 100644 --- a/server/src/streaming/segments/types/message_view_mut.rs +++ b/server/src/streaming/segments/types/message_view_mut.rs @@ -9,28 +9,22 @@ use std::ops::Range; pub struct IggyMessageViewMut<'a> { /// The buffer containing the message buffer: &'a mut [u8], - /// Payload offset payload_offset: usize, } impl<'a> IggyMessageViewMut<'a> { /// Create a new mutable message view from a buffer - pub fn new(buffer: &'a mut [u8]) -> Result<Self, IggyError> { + pub fn new(buffer: &'a mut [u8]) -> Self { let (payload_len, headers_len) = { let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]; let hdr_view = IggyMessageHeaderView::new(hdr_slice); (hdr_view.payload_length(), hdr_view.headers_length()) }; - let total_size = IGGY_MESSAGE_HEADER_SIZE + payload_len + headers_len; - if buffer.len() < total_size as usize { - return Err(IggyError::InvalidMessagePayloadLength); - } - - Ok(Self { + Self { buffer, payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize, - }) + } } /// Get an immutable header view @@ -60,14 +54,14 @@ impl<'a> IggyMessageViewMut<'a> { /// Convenience to update the checksum field in the header pub fn update_checksum(&mut self) { - // let start = 8; // Skip checksum field for checksum calculation - // let size = self.size() - 8; - // let data = &self.buffer[start..start + size]; + let start = 8; // Skip checksum field for checksum calculation + let size = self.size() - 8; + let data = &self.buffer[start..start + size]; - // let checksum = gxhash64(data, 0); + let checksum = gxhash64(data, 0); - // let mut hdr_view = self.msg_header_mut(); - // hdr_view.set_checksum(checksum); + let mut hdr_view = self.msg_header_mut(); + hdr_view.set_checksum(checksum); } } @@ -88,30 +82,40 @@ impl<'a> IggyMessageViewMutIterator<'a> { #[gat] impl LendingIterator for IggyMessageViewMutIterator<'_> { - type Item<'next> = Result<IggyMessageViewMut<'next>, IggyError>; + type Item<'next> = IggyMessageViewMut<'next>; fn next(&mut self) -> Option<Self::Item<'_>> { - if self.position >= self.buffer.len() { + let buffer_len = self.buffer.len(); + if self.position >= buffer_len { + return None; + } + + // Make sure we have enough bytes for at least a header + if self.position + IGGY_MESSAGE_HEADER_SIZE as usize > self.buffer.len() { + tracing::error!( + "Buffer too small for message header at position {}, buffer len: {}", + self.position, + self.buffer.len() + ); + self.position = self.buffer.len(); // Move to end to prevent infinite loops return None; } let buffer_slice = &mut self.buffer[self.position..]; - let result = IggyMessageViewMut::new(buffer_slice); - - match result { - Ok(view) => { - self.position += view.size(); - Some(Ok(view)) - } - Err(e) => { - tracing::error!( - "Failed to create message view at position {}, error: {}", - self.position, - e - ); - self.position += 1; - Some(Err(e)) - } + let view = IggyMessageViewMut::new(buffer_slice); + + // Safety check: Make sure we're advancing + let message_size = view.size(); + if message_size == 0 { + tracing::error!( + "Message size is 0 at position {}, preventing infinite loop", + self.position + ); + self.position = buffer_len; // Move to end to prevent infinite loops + return None; } + + self.position += message_size; + Some(view) } } diff --git a/server/src/streaming/segments/types/messages.rs b/server/src/streaming/segments/types/messages.rs new file mode 100644 index 00000000..322d6a5f --- /dev/null +++ b/server/src/streaming/segments/types/messages.rs @@ -0,0 +1,57 @@ +use bytes::{Bytes, BytesMut}; +use iggy::prelude::*; +use std::ops::Deref; + +/// An immutable messages container that holds a buffer of messages +#[derive(Debug, PartialEq)] +pub struct IggyMessages { + /// The number of messages in the buffer + count: u32, + /// The buffer containing the messages + buffer: Bytes, +} + +impl IggyMessages { + /// Create a new messages container from a buffer + pub fn new(buffer: Bytes, count: u32) -> Self { + Self { buffer, count } + } + + /// Creates a empty messages container + pub fn empty() -> Self { + Self::new(BytesMut::new().freeze(), 0) + } + + /// Create iterator over messages + pub fn iter(&self) -> IggyMessageViewIterator { + IggyMessageViewIterator::new(&self.buffer) + } + + /// Get the number of messages + pub fn count(&self) -> u32 { + self.count + } + + /// Get the total size of all messages in bytes + pub fn size(&self) -> u32 { + self.buffer.len() as u32 + } + + /// Get access to the underlying buffer + pub fn buffer(&self) -> &[u8] { + &self.buffer + } +} + +impl Sizeable for IggyMessages { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.buffer.len() as u64) + } +} + +impl Deref for IggyMessages { + type Target = [u8]; + fn deref(&self) -> &Self::Target { + self.buffer() + } +} diff --git a/server/src/streaming/segments/types/messages_batch.rs b/server/src/streaming/segments/types/messages_batch.rs new file mode 100644 index 00000000..c3bf1531 --- /dev/null +++ b/server/src/streaming/segments/types/messages_batch.rs @@ -0,0 +1,109 @@ +use super::messages::IggyMessages; +use iggy::prelude::*; + +/// A batch container for multiple IggyMessages objects +#[derive(Debug, Default)] +pub struct IggyMessagesBatch { + /// The collection of message containers + messages: Vec<IggyMessages>, + /// Total number of messages across all containers + count: u32, + /// Total size in bytes across all containers + size: u32, +} + +impl IggyMessagesBatch { + /// Create a new empty batch + pub fn new() -> Self { + Self { + messages: Vec::new(), + count: 0, + size: 0, + } + } + + /// Create a new batch with a specified initial capacity + pub fn with_capacity(capacity: usize) -> Self { + Self { + messages: Vec::with_capacity(capacity), + count: 0, + size: 0, + } + } + + /// Create a batch from an existing vector of IggyMessages + pub fn from_vec(messages: Vec<IggyMessages>) -> Self { + let mut batch = Self::with_capacity(messages.len()); + for msg in messages { + batch.add(msg); + } + batch + } + + /// Add a message container to the batch + pub fn add(&mut self, messages: IggyMessages) { + self.count += messages.count(); + self.size += messages.size(); + self.messages.push(messages); + } + + /// Add another batch of messages to the batch + pub fn add_batch(&mut self, other: IggyMessagesBatch) { + self.count += other.messages_count(); + self.size += other.size(); + self.messages.extend(other.messages); + } + + /// Get the total number of messages in the batch + pub fn messages_count(&self) -> u32 { + self.count + } + + /// Get the total size of all messages in bytes + pub fn size(&self) -> u32 { + self.size + } + + /// Get the number of message containers in the batch + pub fn containers_count(&self) -> usize { + self.messages.len() + } + + /// Check if the batch is empty + pub fn is_empty(&self) -> bool { + self.messages.is_empty() || self.count == 0 + } + + /// Get a reference to the underlying vector of message containers + pub fn inner(&self) -> &Vec<IggyMessages> { + &self.messages + } + + /// Consume the batch, returning the underlying vector of message containers + pub fn into_inner(self) -> Vec<IggyMessages> { + self.messages + } + + /// Iterate over all message containers in the batch + pub fn iter(&self) -> impl Iterator<Item = &IggyMessages> { + self.messages.iter() + } +} + +impl Sizeable for IggyMessagesBatch { + fn get_size_bytes(&self) -> IggyByteSize { + IggyByteSize::from(self.size as u64) + } +} + +impl From<Vec<IggyMessages>> for IggyMessagesBatch { + fn from(messages: Vec<IggyMessages>) -> Self { + Self::from_vec(messages) + } +} + +impl From<IggyMessages> for IggyMessagesBatch { + fn from(messages: IggyMessages) -> Self { + Self::from_vec(vec![messages]) + } +} diff --git a/server/src/streaming/segments/types/messages_mut.rs b/server/src/streaming/segments/types/messages_mut.rs index 7fbbca22..d3c4a6ac 100644 --- a/server/src/streaming/segments/types/messages_mut.rs +++ b/server/src/streaming/segments/types/messages_mut.rs @@ -1,36 +1,19 @@ -use super::message_view_mut::IggyMessageViewMutIterator; -use bytes::{BufMut, Bytes, BytesMut}; +use super::{message_view_mut::IggyMessageViewMutIterator, IggyMessages}; +use crate::streaming::segments::Index; +use bytes::{BufMut, BytesMut}; use iggy::prelude::*; use lending_iterator::prelude::*; -use serde::{Deserialize, Serialize}; +use std::ops::Deref; /// A container for mutable messages -#[derive(Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct IggyMessagesMut { /// The number of messages in the buffer - #[serde(skip)] count: u32, - /// The buffer containing the messages buffer: BytesMut, } -impl BytesSerializable for IggyMessagesMut { - fn to_bytes(&self) -> Bytes { - self.buffer.clone().freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - Ok(Self { - buffer: BytesMut::from(&bytes[..]), - count: 0, - }) - } -} - impl Sizeable for IggyMessagesMut { fn get_size_bytes(&self) -> IggyByteSize { IggyByteSize::from(self.buffer.len() as u64) @@ -38,93 +21,93 @@ impl Sizeable for IggyMessagesMut { } impl IggyMessagesMut { - /// Create a new messages container from a buffer - pub fn new(buffer: BytesMut) -> Self { - Self { buffer, count: 0 } + /// Create a new messages container with a specified capacity + pub fn with_capacity(capacity: u32) -> Self { + Self { + buffer: BytesMut::with_capacity(capacity as usize), + count: 0, + } + } + + /// Create a new messages container from a existing buffer of bytes + pub fn from_bytes(bytes: BytesMut, messages_count: u32) -> Self { + Self { + buffer: bytes, + count: messages_count, + } } /// Create a new messages container from a slice of messages pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self { - let mut buffer = BytesMut::with_capacity(messages_size as usize); + let mut messages_mut = Self::with_capacity(messages_size); + for message in messages { - buffer.extend_from_slice(&message.to_bytes()); + messages_mut.buffer.extend_from_slice(&message.to_bytes()); } - let mut messages_mut = Self::new(buffer); messages_mut.count = messages.len() as u32; messages_mut } - /// Extends the messages container with another set of mutable messages by consuming them - pub fn extend(&mut self, messages: IggyMessagesMut) { - let count = messages.count(); - let buffer = messages.into_inner(); - self.buffer.put(buffer); - self.count += count; - } - - /// Sets the capacity of the messages container - pub fn set_capacity(&mut self, capacity: u32) { - self.buffer.reserve(capacity as usize); - } - /// Create a lending iterator over mutable messages pub fn iter_mut(&mut self) -> IggyMessageViewMutIterator { IggyMessageViewMutIterator::new(&mut self.buffer) } - /// Create iterator over messages - pub fn iter(&self) -> IggyMessageViewIterator { - IggyMessageViewIterator::new(&self.buffer) - } - /// Get the number of messages pub fn count(&self) -> u32 { self.count } - /// Set the message count - pub fn set_count(&mut self, count: u32) { - self.count = count; - } - /// Get the total size of all messages in bytes pub fn size(&self) -> u32 { self.buffer.len() as u32 } - /// Get the buffer - pub fn into_inner(self) -> BytesMut { - self.buffer - } - /// Prepares all messages in the batch for persistence by setting their offsets, timestamps, /// and other necessary fields. This method should be called before the messages are written to disk. - /// Returns the number of messages processed. - pub fn prepare_for_persistence(&mut self, base_offset: u64, timestamp: u64) -> u32 { - let mut processed_count = 0; + /// Returns the prepared, immutable messages. + pub fn prepare_for_persistence( + mut self, + base_offset: u64, + timestamp: u64, + current_position: u32, + indexes: &mut Vec<Index>, + ) -> IggyMessages { + let max_messages = self.count; let mut current_offset = base_offset; + let mut current_position = current_position; + let mut iter = self.iter_mut(); + let mut processed_count = 0; - let mut iterator = self.iter_mut(); - while let Some(message_result) = LendingIterator::next(&mut iterator) { - if let Ok(mut message) = message_result { - message.msg_header_mut().set_offset(current_offset); + while let Some(mut message) = iter.next() { + debug_assert!(processed_count <= max_messages); + processed_count += 1; - if message.msg_header().timestamp() == 0 { - message.msg_header_mut().set_timestamp(timestamp); - } + message.msg_header_mut().set_offset(current_offset); + message.msg_header_mut().set_timestamp(timestamp); - message.update_checksum(); + message.update_checksum(); - current_offset += 1; - processed_count += 1; - } + current_offset += 1; + current_position += message.size() as u32; + + indexes.push(Index { + offset: (current_offset - base_offset) as u32, + position: current_position, + timestamp, + }); } - processed_count + let buffer = self.buffer.freeze(); + IggyMessages::new(buffer, self.count) } +} + +impl Deref for IggyMessagesMut { + type Target = BytesMut; - pub fn make_immutable(self) -> IggyMessages { - IggyMessages::new(self.buffer.freeze(), self.count) + fn deref(&self) -> &Self::Target { + &self.buffer } } diff --git a/server/src/streaming/segments/types/messages_slice.rs b/server/src/streaming/segments/types/messages_slice.rs deleted file mode 100644 index 740f065b..00000000 --- a/server/src/streaming/segments/types/messages_slice.rs +++ /dev/null @@ -1,70 +0,0 @@ -use bytes::{Bytes, BytesMut}; -use iggy::prelude::*; - -#[derive(Debug, Clone)] -pub struct IggyMessagesSlice { - buffers: Vec<Bytes>, - count: u32, -} - -impl IggyMessagesSlice { - pub fn new(buffer: Bytes, start: usize, end: usize, count: u32) -> Self { - Self { - buffers: vec![buffer.slice(start..end)], - count, - } - } - - pub fn from_buffers(buffers: Vec<Bytes>, count: u32) -> Self { - Self { buffers, count } - } - - pub fn combine(slices: Vec<IggyMessagesSlice>) -> Self { - let mut buffers = Vec::new(); - let mut total_count = 0; - for slice in slices { - buffers.extend(slice.buffers); - total_count += slice.count; - } - Self { - buffers, - count: total_count, - } - } - - pub fn chunks(&self) -> impl Iterator<Item = &[u8]> + '_ { - self.buffers.iter().map(|b| b.as_ref()) - } - - pub fn chunks_count(&self) -> usize { - self.buffers.len() - } - - pub fn to_messages(&self) -> IggyMessages { - let total_size: usize = self.buffers.iter().map(|b| b.len()).sum(); - let mut combined = BytesMut::with_capacity(total_size); - for chunk in &self.buffers { - combined.extend_from_slice(chunk); - } - IggyMessages::new(combined.freeze(), self.count) - } - - pub fn count(&self) -> u32 { - self.count - } - - pub fn size(&self) -> u32 { - self.buffers.iter().map(|b| b.len() as u32).sum() - } - - pub fn is_empty(&self) -> bool { - self.count == 0 || self.buffers.is_empty() - } - - pub fn empty() -> Self { - Self { - buffers: Vec::new(), - count: 0, - } - } -} diff --git a/server/src/streaming/segments/types/messages_slice_better.rs b/server/src/streaming/segments/types/messages_slice_better.rs deleted file mode 100644 index 8b137891..00000000 --- a/server/src/streaming/segments/types/messages_slice_better.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/server/src/streaming/segments/types/mod.rs b/server/src/streaming/segments/types/mod.rs index 016bc7b2..a39579dd 100644 --- a/server/src/streaming/segments/types/mod.rs +++ b/server/src/streaming/segments/types/mod.rs @@ -1,10 +1,11 @@ mod message_header_view_mut; mod message_view_mut; +mod messages; +mod messages_batch; mod messages_mut; -mod messages_slice; -mod messages_slice_better; pub use message_header_view_mut::IggyMessageHeaderViewMut; pub use message_view_mut::IggyMessageViewMut; +pub use messages::IggyMessages; +pub use messages_batch::IggyMessagesBatch; pub use messages_mut::IggyMessagesMut; -pub use messages_slice::IggyMessagesSlice; diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index f9c3d303..4e5a8a0c 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -1,7 +1,5 @@ -use super::indexes::*; use super::IggyMessagesMut; use crate::streaming::segments::segment::Segment; -use bytes::BufMut; use error_set::ErrContext; use iggy::confirmation::Confirmation; use iggy::prelude::*; @@ -21,15 +19,20 @@ impl Segment { )); } let messages_size = messages.size(); - let messages_accumulator = self.unsaved_messages.get_or_insert_with(Default::default); - let messages_count = - messages_accumulator.coalesce_batch(current_offset, self.last_index_position, messages); + let messages_accumulator = &mut self.accumulator; + let messages_count = messages_accumulator.coalesce_batch( + current_offset, + self.last_index_position, + &mut self.indexes, + messages, + ); if self.current_offset == 0 { self.start_timestamp = messages_accumulator.batch_base_timestamp(); } self.end_timestamp = messages_accumulator.batch_max_timestamp(); self.current_offset = messages_accumulator.max_offset(); + self.size_bytes += IggyByteSize::from(messages_size as u64); self.size_of_parent_stream @@ -48,45 +51,15 @@ impl Segment { Ok(messages_count) } - fn store_offset_and_timestamp_index_for_batch( - &mut self, - batch_last_offset: u64, - batch_max_timestamp: u64, - ) -> Index { - let relative_offset = (batch_last_offset - self.start_offset) as u32; - trace!( - "Storing index for relative_offset: {relative_offset}, start_offset: {}", - self.start_offset - ); - let index = Index { - offset: relative_offset, - position: self.last_index_position, - timestamp: batch_max_timestamp, - }; - if let Some(indexes) = &mut self.indexes { - indexes.push(index); - } - index - } - pub async fn persist_messages( &mut self, confirmation: Option<Confirmation>, ) -> Result<usize, IggyError> { - if self.unsaved_messages.is_none() { + if self.accumulator.is_empty() { return Ok(0); } - let messages_accumulator = self.unsaved_messages.take().unwrap(); - if messages_accumulator.is_empty() { - return Ok(0); - } - // let batch_max_offset = messages_accumulator.max_offset(); - // let batch_max_timestamp = messages_accumulator.batch_max_timestamp(); - // let index = - // self.store_offset_and_timestamp_index_for_batch(batch_max_offset, batch_max_timestamp); - - let unsaved_messages_number = messages_accumulator.unsaved_messages_count(); + let unsaved_messages_number = self.accumulator.unsaved_messages_count(); trace!( "Saving {} messages on disk in segment with start offset: {} for partition with ID: {}...", unsaved_messages_number, @@ -94,7 +67,8 @@ impl Segment { self.partition_id ); - let (messages, indexes) = messages_accumulator.materialize(); + let accumulator = std::mem::take(&mut self.accumulator); + let batches = accumulator.materialize(); let confirmation = match confirmation { Some(val) => val, None => self.config.segment.server_confirmation, @@ -104,28 +78,22 @@ impl Segment { .messages_writer .as_mut() .unwrap() - .save_batches(messages, confirmation) + .save_batches(batches, confirmation) .await .with_error_context(|error| format!("Failed to save batch for {self}. {error}",))?; self.last_index_position += saved_bytes.as_bytes_u64() as u32; + debug_assert!(self.last_index_position == self.indexes.last().unwrap().position); + self.index_writer .as_mut() .unwrap() - .save_indexes(&indexes) + .save_indexes(&self.indexes) .await .with_error_context(|error| format!("Failed to save index for {self}. {error}"))?; - // TODO(hubcio): fix this - // self.size_bytes += IggyByteSize::from(IGGY_BATCH_OVERHEAD); - // self.size_of_parent_stream - // .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel); - // self.size_of_parent_topic - // .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel); - // self.size_of_parent_partition - // .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel); - + self.indexes.clear(); trace!( "Saved {} messages on disk in segment with start offset: {} for partition with ID: {}, total bytes written: {}.", unsaved_messages_number, @@ -137,7 +105,6 @@ impl Segment { if self.is_full().await { self.end_offset = self.current_offset; self.is_closed = true; - self.unsaved_messages = None; self.shutdown_writing().await; info!( "Closed segment with start offset: {}, end offset: {} for partition with ID: {}.", diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 0eff2f1b..6dac1524 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -1,4 +1,4 @@ -use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice}; +use crate::streaming::segments::{IggyMessages, IggyMessagesBatch, IggyMessagesMut}; use crate::streaming::session::Session; use crate::streaming::systems::system::System; use crate::streaming::systems::COMPONENT; @@ -19,7 +19,7 @@ impl System { topic_id: &Identifier, partition_id: Option<u32>, args: PollingArgs, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessagesBatch, IggyError> { self.ensure_authenticated(session)?; if args.count == 0 { return Err(IggyError::InvalidMessagesCount); @@ -60,7 +60,7 @@ impl System { .get_messages(polling_consumer, partition_id, args.strategy, args.count) .await?; - return Ok(result); + Ok(result) // let offset = polled_messages.messages.last().unwrap().offset; // if args.auto_commit { diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 8da9adaf..39eacd93 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -1,5 +1,5 @@ use crate::streaming::polling_consumer::PollingConsumer; -use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice}; +use crate::streaming::segments::{IggyMessages, IggyMessagesBatch, IggyMessagesMut}; use crate::streaming::topics::topic::Topic; use crate::streaming::topics::COMPONENT; use crate::streaming::utils::file::folder_size; @@ -9,7 +9,6 @@ use error_set::ErrContext; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; use iggy::messages::{PartitioningKind, PollingKind}; -use iggy::prelude::IggyMessages; use iggy::prelude::Partitioning; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::expiry::IggyExpiry; @@ -31,7 +30,7 @@ impl Topic { partition_id: u32, strategy: PollingStrategy, count: u32, - ) -> Result<IggyMessagesSlice, IggyError> { + ) -> Result<IggyMessagesBatch, IggyError> { if !self.has_partitions() { return Err(IggyError::NoPartitions(self.topic_id, self.stream_id)); } @@ -176,123 +175,6 @@ impl Topic { partition_id } - pub(crate) async fn load_messages_from_disk_to_cache(&mut self) -> Result<(), IggyError> { - //TODO: Fix me - /* - if !self.config.cache.enabled { - return Ok(()); - } - let path = self.config.get_system_path(); - - // TODO: load data from database instead of calculating the size on disk - let total_size_on_disk_bytes = folder_size(&path) - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to get folder size, path: {path}") - }) - .map_err(|_| IggyError::InvalidSizeBytes)?; - - for partition_lock in self.partitions.values_mut() { - let mut partition = partition_lock.write().await; - - let end_offset = match partition.segments.last() { - Some(segment) => segment.current_offset, - None => { - warn!( - "No segments found for partition ID: {}, topic ID: {}, stream ID: {}", - partition.partition_id, partition.topic_id, partition.stream_id - ); - continue; - } - }; - - trace!( - "Loading messages to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}...", - partition.partition_id, - partition.topic_id, - partition.stream_id, - end_offset - ); - - let partition_size_bytes = partition.get_size_bytes(); - let cache_limit_bytes = self.config.cache.size.clone().into(); - - // Fetch data from disk proportional to the partition size - // eg. 12 partitions, each has 300 MB, cache limit is 500 MB, so there is total 3600 MB of data on SSD. - // 500 MB * (300 / 3600 MB) ~= 41.6 MB to load from cache (assuming all partitions have the same size on disk) - let size_to_fetch_from_disk = (cache_limit_bytes.as_bytes_u64() as f64 - * (partition_size_bytes.as_bytes_u64() as f64 - / total_size_on_disk_bytes.as_bytes_u64() as f64)) - as u64; - let messages = partition - .get_newest_messages_by_size(size_to_fetch_from_disk as u64) - .await - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to get newest messages by size: {size_to_fetch_from_disk}"))?; - - let sum = messages - .iter() - .map(|m| m.get_size_bytes()) - .sum::<IggyByteSize>(); - if !Self::cache_integrity_check(&messages) { - warn!( - "Cache integrity check failed for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...", - partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); - } else if let Some(cache) = &mut partition.cache { - for message in &messages { - cache.push_safe(message.clone()); - } - - info!( - "Loaded {} messages ({} bytes) to cache for partition ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}.", - messages.len(), sum, partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); - } else { - warn!( - "Cache is invalid for ID: {}, topic ID: {}, stream ID: {}, offset: 0 to {}", - partition.partition_id, partition.topic_id, partition.stream_id, end_offset - ); - } - } - - Ok(()) - */ - todo!() - } - - fn cache_integrity_check(cache: ()) -> bool { - //TODO: Fix me - /* - if cache.is_empty() { - warn!("Cache is empty!"); - return false; - } - - let first_offset = cache[0].offset; - let last_offset = cache[cache.len() - 1].offset; - - for i in 1..cache.len() { - if cache[i].offset != cache[i - 1].offset + 1 { - warn!("Offsets are not subsequent at index {} offset {}, for previous index {} offset is {}", i, cache[i].offset, i-1, cache[i-1].offset); - return false; - } - } - - let expected_messages_count: u64 = last_offset - first_offset + 1; - if cache.len() != expected_messages_count as usize { - warn!( - "Messages count is in cache ({}) not equal to expected messages count ({})", - cache.len(), - expected_messages_count - ); - return false; - } - - true - */ - todo!() - } - pub async fn get_expired_segments_start_offsets_per_partition( &self, now: IggyTimestamp, diff --git a/server/src/streaming/topics/storage.rs b/server/src/streaming/topics/storage.rs index b1bd98f6..9b40dcc3 100644 --- a/server/src/streaming/topics/storage.rs +++ b/server/src/streaming/topics/storage.rs @@ -205,12 +205,6 @@ impl TopicStorage for FileTopicStorage { .insert(consumer_group.group_id, RwLock::new(consumer_group)); } - topic - .load_messages_from_disk_to_cache() - .await - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to load messages from disk to cache, topic: {topic}") - })?; info!("Loaded topic {topic}"); Ok(())
