This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit cb7034185f4515d54c35791fe516ff59f8f4a02d Author: Hubert Gruszecki <[email protected]> AuthorDate: Mon Mar 10 02:46:12 2025 +0100 vectored write --- cli/src/args/message.rs | 7 +- integration/tests/mod.rs | 6 +- integration/tests/server/mod.rs | 8 +- integration/tests/streaming/get_by_offset.rs | 118 ++++----- integration/tests/streaming/mod.rs | 33 ++- sdk/src/bytes_serializable.rs | 12 +- sdk/src/cli/message/poll_messages.rs | 4 +- sdk/src/cli/message/send_messages.rs | 5 +- sdk/src/identifier.rs | 10 + sdk/src/messages/partitioning.rs | 10 + sdk/src/messages/polling_kind.rs | 9 - sdk/src/messages/send_messages.rs | 211 ++++++++-------- sdk/src/models/{ => messaging}/header.rs | 0 sdk/src/models/messaging/message.rs | 33 +-- sdk/src/models/messaging/message_header.rs | 1 + sdk/src/models/messaging/message_header_view.rs | 13 +- sdk/src/models/messaging/message_view.rs | 64 ++--- sdk/src/models/messaging/messages.rs | 73 ++++-- sdk/src/models/messaging/mod.rs | 4 +- sdk/src/models/mod.rs | 1 - sdk/src/prelude.rs | 5 +- .../handlers/messages/poll_messages_handler.rs | 10 +- .../handlers/messages/send_messages_handler.rs | 3 - server/src/streaming/partitions/messages.rs | 90 ++++--- server/src/streaming/partitions/partition.rs | 35 +-- server/src/streaming/segments/indexes/index.rs | 193 --------------- .../src/streaming/segments/indexes/index_reader.rs | 133 ++++------ .../src/streaming/segments/indexes/index_writer.rs | 24 -- server/src/streaming/segments/indexes/mod.rs | 1 - .../streaming/segments/messages/messages_reader.rs | 16 +- .../streaming/segments/messages/messages_writer.rs | 23 +- server/src/streaming/segments/messages/mod.rs | 42 ++++ .../streaming/segments/messages/persister_task.rs | 80 +++--- .../segments/messages_accumulator copy 2.rs | 141 ----------- .../segments/messages_accumulator copy.rs | 106 -------- .../src/streaming/segments/messages_accumulator.rs | 91 ++++--- server/src/streaming/segments/mod.rs | 1 + server/src/streaming/segments/reading_messages.rs | 273 +++++++++------------ .../segments/types/message_header_view_mut.rs | 5 +- .../streaming/segments/types/message_view_mut.rs | 24 +- .../src/streaming/segments/types/messages_mut.rs | 32 +-- .../src/streaming/segments/types/messages_slice.rs | 70 ++++++ .../segments/types/messages_slice_better.rs | 1 + server/src/streaming/segments/types/mod.rs | 3 + server/src/streaming/segments/writing_messages.rs | 4 +- server/src/streaming/systems/messages.rs | 89 +++---- server/src/streaming/systems/stats.rs | 19 +- server/src/streaming/topics/messages.rs | 4 +- 48 files changed, 881 insertions(+), 1259 deletions(-) diff --git a/cli/src/args/message.rs b/cli/src/args/message.rs index d334de05..917e2873 100644 --- a/cli/src/args/message.rs +++ b/cli/src/args/message.rs @@ -1,9 +1,6 @@ use clap::builder::NonEmptyStringValueParser; use clap::{ArgGroup, Args, Subcommand}; -use iggy::error::IggyError; -use iggy::error::IggyError::InvalidFormat; -use iggy::identifier::Identifier; -use iggy::models::header::{HeaderKey, HeaderValue}; +use iggy::prelude::*; use std::str::FromStr; #[derive(Debug, Clone, Subcommand)] @@ -109,7 +106,7 @@ fn parse_key_val(s: &str) -> Result<(HeaderKey, HeaderValue), IggyError> { let parts = lower.split(':').collect::<Vec<_>>(); if parts.len() != 3 { - Err(InvalidFormat)?; + return Err(IggyError::InvalidFormat); } let key = HeaderKey::from_str(parts[0])?; diff --git a/integration/tests/mod.rs b/integration/tests/mod.rs index dd153591..56fb66b4 100644 --- a/integration/tests/mod.rs +++ b/integration/tests/mod.rs @@ -9,12 +9,12 @@ use std::{panic, thread}; mod archiver; mod bench; -mod cli; +// mod cli; mod config_provider; mod data_integrity; mod examples; mod server; -mod state; +// mod state; mod streaming; lazy_static! { @@ -32,7 +32,7 @@ fn setup() { let mut logger = env_logger::builder(); logger.is_test(true); - logger.filter(None, log::LevelFilter::Debug); + logger.filter(None, log::LevelFilter::Trace); logger.target(env_logger::Target::Pipe(Box::new(LogWriter(log_buffer)))); logger.format(move |buf, record| { let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.6f"); diff --git a/integration/tests/server/mod.rs b/integration/tests/server/mod.rs index e049a0d3..6d0f60c1 100644 --- a/integration/tests/server/mod.rs +++ b/integration/tests/server/mod.rs @@ -1,4 +1,4 @@ -mod http_server; -mod quic_server; -mod scenarios; -mod tcp_server; +// mod http_server; +// mod quic_server; +// mod scenarios; +// mod tcp_server; diff --git a/integration/tests/streaming/get_by_offset.rs b/integration/tests/streaming/get_by_offset.rs index 7bafca7b..bfaec404 100644 --- a/integration/tests/streaming/get_by_offset.rs +++ b/integration/tests/streaming/get_by_offset.rs @@ -1,16 +1,10 @@ use crate::streaming::common::test_setup::TestSetup; -use bytes::BytesMut; -use iggy::bytes_serializable::BytesSerializable; -use iggy::messages::send_messages::Message; -use iggy::models::header::{HeaderKey, HeaderValue}; -use iggy::utils::byte_size::IggyByteSize; -use iggy::utils::expiry::IggyExpiry; -use iggy::utils::sizeable::Sizeable; -use iggy::utils::timestamp::IggyTimestamp; +use bytes::{Bytes, BytesMut}; +use iggy::prelude::*; use server::configs::resource_quota::MemoryResourceQuota; use server::configs::system::{CacheConfig, PartitionConfig, SegmentConfig, SystemConfig}; -use server::streaming::batching::appendable_batch_info::AppendableBatchInfo; use server::streaming::partitions::partition::Partition; +use server::streaming::segments::IggyMessagesMut; use std::collections::HashMap; use std::str::FromStr; use std::sync::atomic::{AtomicU32, AtomicU64}; @@ -144,12 +138,11 @@ async fn test_get_messages_by_offset( HeaderKey::new("key-3").unwrap(), HeaderValue::from_uint64(123456).unwrap(), ); - let message = Message { - id, - length: payload.len() as u32, - payload: payload.clone(), - headers: Some(headers), - }; + let message = IggyMessage::builder() + .id(id) + .payload(payload) + .headers(headers) + .build(); all_messages.push(message); } @@ -159,18 +152,11 @@ async fn test_get_messages_by_offset( // Append all batches for batch_size in batch_sizes { - let batch = all_messages[current_pos..current_pos + batch_size as usize].to_vec(); - let batch_info = AppendableBatchInfo::new( - batch - .iter() - .map(|msg| msg.get_size_bytes()) - .sum::<IggyByteSize>(), - partition.partition_id, - ); - partition - .append_messages(batch_info, batch.clone(), None) - .await - .unwrap(); + println!("batch_size: {}, current_pos: {}", batch_size, current_pos); + let batch = + IggyMessagesMut::from(&all_messages[current_pos..current_pos + batch_size as usize]); + assert_eq!(batch.count(), batch_size); + partition.append_messages(batch, None).await.unwrap(); batch_offsets.push(partition.current_offset); current_pos += batch_size as usize; @@ -182,11 +168,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - all_loaded_messages.len(), - total_messages as usize, + all_loaded_messages.count(), + total_messages, "Expected {} messages from start, but got {}", total_messages, - all_loaded_messages.len() + all_loaded_messages.count() ); // Test 2: Get messages from middle (after 3rd batch) @@ -197,11 +183,11 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - middle_messages.len(), - remaining_messages as usize, + middle_messages.count(), + remaining_messages, "Expected {} messages from middle offset, but got {}", remaining_messages, - middle_messages.len() + middle_messages.count() ); // Test 3: No messages beyond final offset @@ -211,10 +197,10 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - no_messages.len(), + no_messages.count(), 0, "Expected no messages beyond final offset, but got {}", - no_messages.len() + no_messages.count() ); // Test 4: Small subset from start @@ -224,63 +210,77 @@ async fn test_get_messages_by_offset( .await .unwrap(); assert_eq!( - subset_messages.len(), - subset_size as usize, + subset_messages.count(), + subset_size, "Expected {} messages in subset from start, but got {}", subset_size, - subset_messages.len() + subset_messages.count() ); // Test 5: Messages spanning multiple batches let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch - let spanning_messages = partition + let messages = partition .get_messages_by_offset(span_offset, span_size) .await .unwrap(); assert_eq!( - spanning_messages.len(), - span_size as usize, + messages.count(), + span_size, "Expected {} messages spanning multiple batches, but got {}", span_size, - spanning_messages.len() + messages.count() ); // Test 6: Validate message content and ordering - for (i, msg) in spanning_messages.iter().enumerate() { + // Convert to a Vec of IggyMessage to avoid lifetime issues + let message_vec = messages.to_messages(); + + for (i, msg) in message_vec.iter().enumerate() { let expected_offset = span_offset + i as u64; assert!( - msg.offset >= expected_offset, + msg.msg_header().offset() >= expected_offset, "Message offset {} at position {} should be >= expected offset {}", - msg.offset, + msg.msg_header().offset(), i, expected_offset ); // Verify message contents match original - let original_idx = msg.offset as usize; + let original_idx = msg.msg_header().offset() as usize; + let original_message = &all_messages[original_idx]; + let original_msg_header = &original_message.header; + let original_headers = &original_message.headers.as_ref().unwrap().to_bytes(); + let original_payload = &original_message.payload; + + let msg_header = msg.msg_header(); + let msg_payload = msg.payload(); + let msg_headers = msg.headers(); + assert_eq!( - msg.id, original_message.id, + msg_header.id(), + original_msg_header.id, "Message ID mismatch at offset {}: expected {}, got {}", - msg.offset, original_message.id, msg.id + msg_header.offset(), + original_msg_header.id, + msg.msg_header().id() ); assert_eq!( - msg.payload, original_message.payload, + msg_payload, + original_payload, "Payload mismatch at offset {}: expected {:?}, got {:?}", - msg.offset, original_message.payload, msg.payload + msg_header.offset(), + original_payload, + msg_payload ); assert_eq!( - msg.headers - .as_ref() - .map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap()), - original_message.headers, + &msg_headers, + &original_headers, "Headers mismatch at offset {}: expected {:?}, got {:?}", - msg.offset, - original_message.headers, - msg.headers - .as_ref() - .map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap()) + msg_header.offset(), + original_headers, + msg_headers ); } } diff --git a/integration/tests/streaming/mod.rs b/integration/tests/streaming/mod.rs index d2ad7ec3..194a1782 100644 --- a/integration/tests/streaming/mod.rs +++ b/integration/tests/streaming/mod.rs @@ -1,20 +1,20 @@ use bytes::Bytes; -use iggy::messages::send_messages::Message; +use iggy::prelude::IggyMessage; mod common; -mod consumer_offset; +// mod consumer_offset; mod get_by_offset; -mod get_by_timestamp; -mod messages; -mod partition; -mod segment; -mod snapshot; -mod stream; -mod system; -mod topic; -mod topic_messages; +// mod get_by_timestamp; +// mod messages; +// mod partition; +// mod segment; +// mod snapshot; +// mod stream; +// mod system; +// mod topic; +// mod topic_messages; -fn create_messages() -> Vec<Message> { +fn create_messages() -> Vec<IggyMessage> { vec![ create_message(1, "message 1"), create_message(2, "message 2"), @@ -25,12 +25,7 @@ fn create_messages() -> Vec<Message> { ] } -fn create_message(id: u128, payload: &str) -> Message { +fn create_message(id: u128, payload: &str) -> IggyMessage { let payload = Bytes::from(payload.to_string()); - Message { - id, - length: payload.len() as u32, - payload, - headers: None, - } + IggyMessage::builder().id(id).payload(payload).build() } diff --git a/sdk/src/bytes_serializable.rs b/sdk/src/bytes_serializable.rs index 0ba88326..e3158721 100644 --- a/sdk/src/bytes_serializable.rs +++ b/sdk/src/bytes_serializable.rs @@ -1,5 +1,5 @@ use crate::error::IggyError; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; /// The trait represents the logic responsible for serializing and deserializing the struct to and from bytes. pub trait BytesSerializable { @@ -10,4 +10,14 @@ pub trait BytesSerializable { fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> where Self: Sized; + + /// Write the struct to a buffer. + fn write_to_buffer(&self, _buf: &mut BytesMut) { + unimplemented!(); + } + + /// Get the byte-size of the struct. + fn get_buffer_size(&self) -> u32 { + unimplemented!(); + } } diff --git a/sdk/src/cli/message/poll_messages.rs b/sdk/src/cli/message/poll_messages.rs index a42e4a0b..eae02afc 100644 --- a/sdk/src/cli/message/poll_messages.rs +++ b/sdk/src/cli/message/poll_messages.rs @@ -4,8 +4,8 @@ use crate::client::Client; use crate::consumer::Consumer; use crate::identifier::Identifier; use crate::messages::{PollMessages, PollingStrategy}; -use crate::models::header::{HeaderKey, HeaderKind}; -use crate::prelude::IggyMessage; +use crate::models::messaging::HeaderKind; +use crate::prelude::{HeaderKey, IggyMessage}; use crate::utils::sizeable::Sizeable; use crate::utils::timestamp::IggyTimestamp; use crate::utils::{byte_size::IggyByteSize, duration::IggyDuration}; diff --git a/sdk/src/cli/message/send_messages.rs b/sdk/src/cli/message/send_messages.rs index 8860cf03..ca91b3c5 100644 --- a/sdk/src/cli/message/send_messages.rs +++ b/sdk/src/cli/message/send_messages.rs @@ -2,9 +2,8 @@ use crate::bytes_serializable::BytesSerializable; use crate::cli_command::{CliCommand, PRINT_TARGET}; use crate::client::Client; use crate::identifier::Identifier; -use crate::models::header::{HeaderKey, HeaderValue}; -use crate::prelude::IggyMessage; -use crate::prelude::Partitioning; +use crate::prelude::{HeaderKey, IggyMessage}; +use crate::prelude::{HeaderValue, Partitioning}; use crate::utils::sizeable::Sizeable; use anyhow::Context; use async_trait::async_trait; diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs index b6abadc4..e813c90d 100644 --- a/sdk/src/identifier.rs +++ b/sdk/src/identifier.rs @@ -222,6 +222,16 @@ impl BytesSerializable for Identifier { identifier.validate()?; Ok(identifier) } + + fn write_to_buffer(&self, bytes: &mut BytesMut) { + bytes.put_u8(self.kind.as_code()); + bytes.put_u8(self.length); + bytes.put_slice(&self.value); + } + + fn get_buffer_size(&self) -> u32 { + 2 + self.length as u32 + } } impl IdKind { diff --git a/sdk/src/messages/partitioning.rs b/sdk/src/messages/partitioning.rs index 74a8924f..892095c0 100644 --- a/sdk/src/messages/partitioning.rs +++ b/sdk/src/messages/partitioning.rs @@ -189,4 +189,14 @@ impl BytesSerializable for Partitioning { value, }) } + + fn write_to_buffer(&self, bytes: &mut BytesMut) { + bytes.put_u8(self.kind.as_code()); + bytes.put_u8(self.length); + bytes.put_slice(&self.value); + } + + fn get_buffer_size(&self) -> u32 { + 2 + self.length as u32 + } } diff --git a/sdk/src/messages/polling_kind.rs b/sdk/src/messages/polling_kind.rs index edb6d7f5..2ee61d8a 100644 --- a/sdk/src/messages/polling_kind.rs +++ b/sdk/src/messages/polling_kind.rs @@ -1,14 +1,5 @@ -use crate::bytes_serializable::BytesSerializable; -use crate::command::{Command, POLL_MESSAGES_CODE}; -use crate::consumer::{Consumer, ConsumerKind}; use crate::error::IggyError; -use crate::identifier::Identifier; -use crate::utils::sizeable::Sizeable; -use crate::utils::timestamp::IggyTimestamp; -use crate::validatable::Validatable; -use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; -use serde_with::{serde_as, DisplayFromStr}; use std::fmt::Display; use std::str::FromStr; diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 139c4aac..04dd294c 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -4,8 +4,9 @@ use crate::command::{Command, SEND_MESSAGES_CODE}; use crate::error::IggyError; use crate::identifier::Identifier; use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE}; -use crate::models::header::{HeaderKey, HeaderValue}; +use crate::models::messaging::{HeaderKey, HeaderValue}; use crate::models::messaging::{IggyMessage, IggyMessageHeader, IggyMessageViewIterator}; +use crate::prelude::IGGY_MESSAGE_HEADER_SIZE; use crate::utils::byte_size::IggyByteSize; use crate::utils::sizeable::Sizeable; use crate::utils::varint::IggyVarInt; @@ -49,32 +50,42 @@ impl SendMessages { partitioning: &Partitioning, messages: &[IggyMessage], ) -> Bytes { - let stream_id_bytes = stream_id.to_bytes(); - let topic_id_bytes = topic_id.to_bytes(); - let partitioning_bytes = partitioning.to_bytes(); + let stream_id_size = stream_id.get_buffer_size(); + let topic_id_size = topic_id.get_buffer_size(); + let partitioning_size = partitioning.get_buffer_size(); let messages_count = messages.len() as u32; - let total_size = stream_id_bytes.len() - + topic_id_bytes.len() - + partitioning_bytes.len() + let total_size = stream_id_size + + topic_id_size + + partitioning_size + 4 // For messages_count (u32) + messages .iter() - .map(|m| m.get_size_bytes().as_bytes_usize()) - .sum::<usize>(); + .map(|m| m.get_size_bytes().as_bytes_u64() as u32) + .sum::<u32>(); - let mut bytes = BytesMut::with_capacity(total_size); + let mut bytes = BytesMut::with_capacity(total_size as usize); - bytes.put_slice(&stream_id_bytes); - bytes.put_slice(&topic_id_bytes); - bytes.put_slice(&partitioning_bytes); + stream_id.write_to_buffer(&mut bytes); + topic_id.write_to_buffer(&mut bytes); + partitioning.write_to_buffer(&mut bytes); bytes.put_u32_le(messages_count); for message in messages { - message.write_to_bytes_mut(&mut bytes); + message.write_to_buffer(&mut bytes); } - bytes.freeze() + let result = bytes.freeze(); + + debug_assert_eq!( + total_size as usize, + result.len(), + "Calculated size ({}) doesn't match actual bytes length ({})", + total_size, + result.len() + ); + + result } } @@ -109,37 +120,30 @@ impl Validatable<IggyError> for SendMessages { return Err(IggyError::InvalidKeyValueLength); } - let mut headers_size = 0; let mut payload_size = 0; let mut message_count = 0; - for message_result in IggyMessageViewIterator::new(&self.messages) { - match message_result { - Ok(message_view) => { - message_count += 1; - - // // TODO: fix this - // if let Ok(Some(headers)) = message_view.headers() { - // for value in headers.values() { - // headers_size += value.value.len() as u32; - // if headers_size > MAX_HEADERS_SIZE { - // return Err(IggyError::TooBigHeadersPayload); - // } - // } - // } - - let message_payload = message_view.payload(); - payload_size += message_payload.len() as u32; - if payload_size > MAX_PAYLOAD_SIZE { - return Err(IggyError::TooBigMessagePayload); - } - - // todo(hubcio): make it use IGGY_MESSAGE_HEADER_SIZE - if message_payload.len() < 56 { - return Err(IggyError::InvalidMessagePayloadLength); - } - } - Err(e) => return Err(e), + for message in IggyMessageViewIterator::new(&self.messages) { + message_count += 1; + + // TODO(hubcio): IMHO validation of headers should be purely on SDK side + // if let Some(headers) = message.headers() { + // for value in headers.values() { + // headers_size += value.value.len() as u32; + // if headers_size > MAX_HEADERS_SIZE { + // return Err(IggyError::TooBigHeadersPayload); + // } + // } + // } + + let message_payload = message.payload(); + payload_size += message_payload.len() as u32; + if payload_size > MAX_PAYLOAD_SIZE { + return Err(IggyError::TooBigMessagePayload); + } + + if message_payload.len() < IGGY_MESSAGE_HEADER_SIZE as usize { + return Err(IggyError::InvalidMessagePayloadLength); } } @@ -157,73 +161,76 @@ impl Validatable<IggyError> for SendMessages { impl BytesSerializable for SendMessages { fn to_bytes(&self) -> Bytes { - let stream_id_bytes = self.stream_id.to_bytes(); - let topic_id_bytes = self.topic_id.to_bytes(); - let partitioning_bytes = self.partitioning.to_bytes(); + panic!("should not be used") - let metadata_len = stream_id_bytes.len() - + topic_id_bytes.len() - + partitioning_bytes.len() - + std::mem::size_of::<u32>(); + // let stream_id_bytes = self.stream_id.to_bytes(); + // let topic_id_bytes = self.topic_id.to_bytes(); + // let partitioning_bytes = self.partitioning.to_bytes(); - let total_len = metadata_len + self.messages.len(); + // let metadata_len = stream_id_bytes.len() + // + topic_id_bytes.len() + // + partitioning_bytes.len() + // + std::mem::size_of::<u32>(); - let mut bytes = BytesMut::with_capacity(total_len); + // let total_len = metadata_len + self.messages.len(); - bytes.put_slice(&stream_id_bytes); - bytes.put_slice(&topic_id_bytes); - bytes.put_slice(&partitioning_bytes); - bytes.put_u32_le(self.messages_count); - bytes.put_slice(&self.messages); + // let mut bytes = BytesMut::with_capacity(total_len); - bytes.freeze() + // bytes.put_slice(&stream_id_bytes); + // bytes.put_slice(&topic_id_bytes); + // bytes.put_slice(&partitioning_bytes); + // bytes.put_u32_le(self.messages_count); + // bytes.put_slice(&self.messages); + + // bytes.freeze() } fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> { - if bytes.is_empty() { - return Err(IggyError::InvalidCommand); - } - - let mut position = 0; - let stream_id = Identifier::from_bytes(bytes.clone())?; - position += stream_id.get_size_bytes().as_bytes_usize(); - - if bytes.len() <= position { - return Err(IggyError::InvalidCommand); - } - - let topic_id = Identifier::from_bytes(bytes.slice(position..))?; - position += topic_id.get_size_bytes().as_bytes_usize(); - - if bytes.len() <= position { - return Err(IggyError::InvalidCommand); - } - - let partitioning = Partitioning::from_bytes(bytes.slice(position..))?; - position += partitioning.get_size_bytes().as_bytes_usize(); - - if bytes.len() < position + 4 { - return Err(IggyError::InvalidCommand); - } - - let messages_count = u32::from_le_bytes( - bytes - .slice(position..position + 4) - .as_ref() - .try_into() - .unwrap(), - ); - position += 4; - - let messages = bytes.slice(position..); - - Ok(SendMessages { - stream_id, - topic_id, - partitioning, - messages_count, - messages, - }) + panic!("should not be used") + // if bytes.is_empty() { + // return Err(IggyError::InvalidCommand); + // } + + // let mut position = 0; + // let stream_id = Identifier::from_bytes(bytes.clone())?; + // position += stream_id.get_size_bytes().as_bytes_usize(); + + // if bytes.len() <= position { + // return Err(IggyError::InvalidCommand); + // } + + // let topic_id = Identifier::from_bytes(bytes.slice(position..))?; + // position += topic_id.get_size_bytes().as_bytes_usize(); + + // if bytes.len() <= position { + // return Err(IggyError::InvalidCommand); + // } + + // let partitioning = Partitioning::from_bytes(bytes.slice(position..))?; + // position += partitioning.get_size_bytes().as_bytes_usize(); + + // if bytes.len() < position + 4 { + // return Err(IggyError::InvalidCommand); + // } + + // let messages_count = u32::from_le_bytes( + // bytes + // .slice(position..position + 4) + // .as_ref() + // .try_into() + // .unwrap(), + // ); + // position += 4; + + // let messages = bytes.slice(position..); + + // Ok(SendMessages { + // stream_id, + // topic_id, + // partitioning, + // messages_count, + // messages, + // }) } } diff --git a/sdk/src/models/header.rs b/sdk/src/models/messaging/header.rs similarity index 100% rename from sdk/src/models/header.rs rename to sdk/src/models/messaging/header.rs diff --git a/sdk/src/models/messaging/message.rs b/sdk/src/models/messaging/message.rs index 662cefb5..d60344fe 100644 --- a/sdk/src/models/messaging/message.rs +++ b/sdk/src/models/messaging/message.rs @@ -1,8 +1,8 @@ +use super::header::get_headers_size_bytes; use super::message_header::{IggyMessageHeader, IGGY_MESSAGE_HEADER_SIZE}; use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; -use crate::models::header; -use crate::models::header::{HeaderKey, HeaderValue}; +use crate::models::messaging::header::{HeaderKey, HeaderValue}; use crate::utils::byte_size::IggyByteSize; use crate::utils::sizeable::Sizeable; use crate::utils::timestamp::IggyTimestamp; @@ -37,15 +37,6 @@ impl IggyMessage { pub fn builder() -> IggyMessageBuilder { IggyMessageBuilder::new() } - - /// Write message to bytes mut - pub fn write_to_bytes_mut(&self, buf: &mut BytesMut) { - buf.put_slice(&self.header.to_bytes()); - buf.put_slice(&self.payload); - if let Some(headers) = &self.headers { - buf.put_slice(&headers.to_bytes()); - } - } } impl FromStr for IggyMessage { @@ -83,7 +74,7 @@ impl std::fmt::Display for IggyMessage { impl Sizeable for IggyMessage { fn get_size_bytes(&self) -> IggyByteSize { let payload_len = IggyByteSize::from(self.payload.len() as u64); - let headers_len = header::get_headers_size_bytes(&self.headers); + let headers_len = get_headers_size_bytes(&self.headers); let message_header_len = IggyByteSize::from(IGGY_MESSAGE_HEADER_SIZE as u64); payload_len + headers_len + message_header_len @@ -131,6 +122,15 @@ impl BytesSerializable for IggyMessage { headers, }) } + + /// Write message to bytes mut + fn write_to_buffer(&self, buf: &mut BytesMut) { + buf.put_slice(&self.header.to_bytes()); + buf.put_slice(&self.payload); + if let Some(headers) = &self.headers { + buf.put_slice(&headers.to_bytes()); + } + } } #[derive(Debug, Default)] @@ -172,20 +172,21 @@ impl IggyMessageBuilder { pub fn build(self) -> IggyMessage { let payload = self.payload.unwrap_or_default(); - let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().as_u128()); + let id = self.id.unwrap_or(0); + let headers_length = get_headers_size_bytes(&self.headers).as_bytes_u64() as u32; - let header = IggyMessageHeader { + let msg_header = IggyMessageHeader { checksum: 0, // Checksum is calculated on server side id, offset: 0, timestamp: 0, origin_timestamp: IggyTimestamp::now().as_micros(), - headers_length: 0, + headers_length, payload_length: payload.len() as u32, }; IggyMessage { - header, + header: msg_header, payload, headers: self.headers, } diff --git a/sdk/src/models/messaging/message_header.rs b/sdk/src/models/messaging/message_header.rs index adc2ff85..807bd371 100644 --- a/sdk/src/models/messaging/message_header.rs +++ b/sdk/src/models/messaging/message_header.rs @@ -7,6 +7,7 @@ use bytes::{BufMut, Bytes, BytesMut}; use std::ops::Range; pub const IGGY_MESSAGE_HEADER_SIZE: u32 = 8 + 16 + 8 + 8 + 8 + 4 + 4; +pub const IGGY_MESSAGE_HEADER_RANGE: Range<usize> = 0..(IGGY_MESSAGE_HEADER_SIZE as usize); pub const IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE: Range<usize> = 0..8; pub const IGGY_MESSAGE_ID_OFFSET_RANGE: Range<usize> = 8..24; diff --git a/sdk/src/models/messaging/message_header_view.rs b/sdk/src/models/messaging/message_header_view.rs index 21a9f753..3846d450 100644 --- a/sdk/src/models/messaging/message_header_view.rs +++ b/sdk/src/models/messaging/message_header_view.rs @@ -1,5 +1,4 @@ use super::message_header::*; -use crate::error::IggyError; /// A read-only, typed view into a message header in a raw buffer. /// @@ -14,11 +13,13 @@ impl<'a> IggyMessageHeaderView<'a> { /// Creates a new `IggyMessageHeaderView` over `data`. /// /// Returns an error if `data.len() < IGGY_MESSAGE_HEADER_SIZE`. - pub fn new(data: &'a [u8]) -> Result<Self, IggyError> { - if data.len() < IGGY_MESSAGE_HEADER_SIZE as usize { - return Err(IggyError::InvalidCommand); - } - Ok(Self { data }) + pub fn new(data: &'a [u8]) -> Self { + debug_assert!( + data.len() >= IGGY_MESSAGE_HEADER_SIZE as usize, + "Header view requires at least {} bytes", + IGGY_MESSAGE_HEADER_SIZE + ); + Self { data } } /// The stored checksum at the start of the header diff --git a/sdk/src/models/messaging/message_view.rs b/sdk/src/models/messaging/message_view.rs index 4acd326b..40b20f77 100644 --- a/sdk/src/models/messaging/message_view.rs +++ b/sdk/src/models/messaging/message_view.rs @@ -1,6 +1,10 @@ use super::message_header::*; use super::IggyMessageHeaderView; use crate::error::IggyError; +use crate::models::messaging::header::HeaderKey; +use crate::models::messaging::header::HeaderValue; +use crate::prelude::BytesSerializable; +use std::collections::HashMap; use std::iter::Iterator; /// A immutable view of a message. @@ -13,30 +17,21 @@ pub struct IggyMessageView<'a> { impl<'a> IggyMessageView<'a> { /// Creates a new immutable message view from a buffer. - pub fn new(buffer: &'a [u8]) -> Result<Self, IggyError> { - if buffer.len() < IGGY_MESSAGE_HEADER_SIZE as usize { - return Err(IggyError::InvalidCommand); - } - // Create a header view from the header slice. - let header_view = - IggyMessageHeaderView::new(&buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize])?; + pub fn new(buffer: &'a [u8]) -> Self { + let header_view = IggyMessageHeaderView::new(&buffer[IGGY_MESSAGE_HEADER_RANGE]); let payload_len = header_view.payload_length() as usize; - let headers_len = header_view.headers_length() as usize; - let total_size = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len + headers_len; + let payload_offset = IGGY_MESSAGE_HEADER_SIZE as usize; + let headers_offset = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len; - if buffer.len() < total_size { - return Err(IggyError::InvalidMessagePayloadLength); - } - - Ok(Self { + Self { buffer, - payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize, - headers_offset: IGGY_MESSAGE_HEADER_SIZE as usize + payload_len, - }) + payload_offset, + headers_offset, + } } /// Returns an immutable header view. - pub fn msg_header(&self) -> Result<IggyMessageHeaderView<'_>, IggyError> { + pub fn msg_header(&self) -> IggyMessageHeaderView<'_> { IggyMessageHeaderView::new(&self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]) } @@ -47,7 +42,7 @@ impl<'a> IggyMessageView<'a> { /// Returns the size of the entire message. pub fn size(&self) -> usize { - let header_view = self.msg_header().expect("header must be valid"); + let header_view = self.msg_header(); IGGY_MESSAGE_HEADER_SIZE as usize + header_view.payload_length() as usize + header_view.headers_length() as usize @@ -55,10 +50,27 @@ impl<'a> IggyMessageView<'a> { /// Returns a reference to the payload portion. pub fn payload(&self) -> &[u8] { - let header_view = self.msg_header().expect("header must be valid"); + let header_view = self.msg_header(); let payload_len = header_view.payload_length() as usize; &self.buffer[self.payload_offset..self.payload_offset + payload_len] } + + /// Validates that the message view is properly formatted and has valid data. + pub fn validate(&self) -> Result<(), IggyError> { + if self.buffer.len() < IGGY_MESSAGE_HEADER_SIZE as usize { + return Err(IggyError::InvalidMessagePayloadLength); + } + + let header = self.msg_header(); + let payload_len = header.payload_length() as usize; + let headers_len = header.headers_length() as usize; + let total_size = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len + headers_len; + + if self.buffer.len() < total_size { + return Err(IggyError::InvalidMessagePayloadLength); + } + Ok(()) + } } /// Iterator over immutable message views in a buffer. @@ -77,7 +89,7 @@ impl<'a> IggyMessageViewIterator<'a> { } impl<'a> Iterator for IggyMessageViewIterator<'a> { - type Item = Result<IggyMessageView<'a>, IggyError>; + type Item = IggyMessageView<'a>; fn next(&mut self) -> Option<Self::Item> { if self.position >= self.buffer.len() { @@ -85,12 +97,8 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> { } let remaining = &self.buffer[self.position..]; - match IggyMessageView::new(remaining) { - Ok(view) => { - self.position += view.size(); - Some(Ok(view)) - } - Err(e) => Some(Err(e)), - } + let view = IggyMessageView::new(remaining); + self.position += view.size(); + Some(view) } } diff --git a/sdk/src/models/messaging/messages.rs b/sdk/src/models/messaging/messages.rs index dd72bc5a..cb2f4a54 100644 --- a/sdk/src/models/messaging/messages.rs +++ b/sdk/src/models/messaging/messages.rs @@ -1,4 +1,8 @@ -use super::{message_view::IggyMessageViewIterator, IggyMessage}; +use std::collections::HashMap; + +use super::{ + message_view::IggyMessageViewIterator, IggyMessage, IggyMessageHeader, IGGY_MESSAGE_HEADER_SIZE, +}; use crate::bytes_serializable::BytesSerializable; use crate::error::IggyError; use crate::utils::byte_size::IggyByteSize; @@ -27,15 +31,6 @@ impl IggyMessages { Self::new(BytesMut::with_capacity(capacity as usize).freeze(), 0) } - /// Create a new messages container from a slice of messages - pub fn from_messages(messages: &[IggyMessage]) -> Self { - let mut buffer = BytesMut::new(); - for message in messages { - buffer.extend_from_slice(&message.to_bytes()); - } - Self::new(buffer.freeze(), messages.len() as u32) - } - /// Create iterator over messages pub fn iter(&self) -> IggyMessageViewIterator { IggyMessageViewIterator::new(&self.buffer) @@ -56,24 +51,61 @@ impl IggyMessages { &self.buffer } + /// Get access to the underlying buffer shallow copy + pub fn shallow_copy(&self) -> Bytes { + self.buffer.clone() + } + pub fn into_inner(self) -> Bytes { self.buffer } - /// Convert to messages pub fn to_messages(self) -> Vec<IggyMessage> { let mut messages = Vec::with_capacity(self.count as usize); let mut position = 0; + let buf_len = self.buffer.len(); - while position < self.buffer.len() { - let remaining = self.buffer.slice(position..); - if let Ok(message) = IggyMessage::from_bytes(remaining) { - let message_size = message.get_size_bytes().as_bytes_usize(); - position += message_size; - messages.push(message); - } else { + while position < buf_len { + if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { break; } + let header_bytes = self + .buffer + .slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); + let header = match IggyMessageHeader::from_bytes(header_bytes) { + Ok(h) => h, + Err(_) => break, + }; + position += IGGY_MESSAGE_HEADER_SIZE as usize; + + let payload_end = position + header.payload_length as usize; + if payload_end > buf_len { + break; + } + let payload = self.buffer.slice(position..payload_end); + position = payload_end; + + let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> = if header.headers_length > 0 { + let headers_end = position + header.headers_length as usize; + if headers_end > buf_len { + break; + } + let headers_bytes = self.buffer.slice(position..headers_end); + position = headers_end; + + match HashMap::from_bytes(headers_bytes) { + Ok(map) => Some(map), + Err(_) => break, + } + } else { + None + }; + + messages.push(IggyMessage { + header, + payload, + headers, + }); } messages @@ -90,10 +122,9 @@ impl BytesSerializable for IggyMessages { Self: Sized, { let mut messages_count = 0; - let iterator = IggyMessageViewIterator::new(&bytes); + let iterator: IggyMessageViewIterator<'_> = IggyMessageViewIterator::new(&bytes); - for result in iterator { - result?; + for _ in iterator { messages_count += 1; } diff --git a/sdk/src/models/messaging/mod.rs b/sdk/src/models/messaging/mod.rs index 056e2ffc..2605a4af 100644 --- a/sdk/src/models/messaging/mod.rs +++ b/sdk/src/models/messaging/mod.rs @@ -1,13 +1,15 @@ +mod header; mod message; mod message_header; mod message_header_view; mod message_view; mod messages; +pub use header::{HeaderKey, HeaderKind, HeaderValue}; pub use message::IggyMessage; pub use message_header::{ IggyMessageHeader, IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, - IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, + IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_HEADER_RANGE, IGGY_MESSAGE_HEADER_SIZE, IGGY_MESSAGE_ID_OFFSET_RANGE, IGGY_MESSAGE_OFFSET_OFFSET_RANGE, IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE, diff --git a/sdk/src/models/mod.rs b/sdk/src/models/mod.rs index 5c38549d..892b1e2e 100644 --- a/sdk/src/models/mod.rs +++ b/sdk/src/models/mod.rs @@ -2,7 +2,6 @@ pub mod client_info; pub mod consumer_group; pub mod consumer_offset_info; -pub mod header; pub mod identity_info; pub mod messaging; pub mod partition; diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs index 1f2931aa..9172ead2 100644 --- a/sdk/src/prelude.rs +++ b/sdk/src/prelude.rs @@ -19,7 +19,7 @@ pub use crate::messages::{ FlushUnsavedBuffer, Partitioning, PollMessages, PollingKind, PollingStrategy, SendMessages, }; pub use crate::models::messaging::{ - IggyMessage, IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, + HeaderKey, HeaderValue, IggyMessage, IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, IggyMessageViewIterator, IggyMessages, }; pub use crate::models::messaging::{ @@ -31,7 +31,10 @@ pub use crate::models::messaging::{ pub use crate::models::partition::Partition; pub use crate::models::stream::Stream; pub use crate::models::topic::Topic; + pub use crate::utils::byte_size::IggyByteSize; +pub use crate::utils::expiry::IggyExpiry; pub use crate::utils::sizeable::Sizeable; pub use crate::utils::timestamp::IggyTimestamp; + pub use crate::validatable::Validatable; diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs b/server/src/binary/handlers/messages/poll_messages_handler.rs index 846ca909..85527060 100644 --- a/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -41,7 +41,15 @@ impl ServerCommandHandler for PollMessages { self.consumer, self.stream_id, self.topic_id, self.partition_id ))?; - sender.send_ok_response(messages.buffer()).await?; + // Collect all chunks first into a Vec to extend their lifetimes. + // This ensures the Arc<[u8]> references from each ByteSliceView stay alive + // throughout the async vectored I/O operation, preventing "borrowed value does not live + // long enough" errors while optimizing transmission by using larger chunks. + + let length = messages.size().to_le_bytes(); + let slices: Vec<IoSlice> = messages.chunks().map(IoSlice::new).collect(); + + sender.send_ok_response_vectored(&length, slices).await?; Ok(()) } } diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs b/server/src/binary/handlers/messages/send_messages_handler.rs index 5c3f20e2..60d9735a 100644 --- a/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/server/src/binary/handlers/messages/send_messages_handler.rs @@ -9,7 +9,6 @@ use iggy::error::IggyError; use iggy::identifier::Identifier; use iggy::prelude::*; use iggy::utils::sizeable::Sizeable; -use std::time::Instant; use tracing::instrument; impl ServerCommandHandler for SendMessages { @@ -68,8 +67,6 @@ impl ServerCommandHandler for SendMessages { None, ) .await?; - - // TODO(hubcio): is it legal to drop here? drop(system); self.stream_id = stream_id; diff --git a/server/src/streaming/partitions/messages.rs b/server/src/streaming/partitions/messages.rs index c0fef4fc..d4e6fc1f 100644 --- a/server/src/streaming/partitions/messages.rs +++ b/server/src/streaming/partitions/messages.rs @@ -66,7 +66,7 @@ impl Partition { &self, start_offset: u64, count: u32, - ) -> Result<IggyMessages, IggyError> { + ) -> Result<IggyMessagesSlice, IggyError> { trace!( "Getting messages for start offset: {start_offset} for partition: {}, current offset: {}...", self.partition_id, @@ -85,27 +85,20 @@ impl Partition { let segments = self.filter_segments_by_offsets(start_offset, end_offset); match segments.len() { 0 => panic!("TODO"), - 1 => { - let messages = segments[0] - .get_messages_by_offset(start_offset, count) - .await?; - Ok(messages) - } - _ => { - let messages = - Self::get_messages_from_segments(segments, start_offset, count).await?; - Ok(messages) - } + 1 => Ok(segments[0] + .get_messages_by_offset(start_offset, count) + .await?), + _ => Ok(Self::get_messages_from_segments(segments, start_offset, count).await?), } } // Retrieves the first messages (up to a specified count). - pub async fn get_first_messages(&self, count: u32) -> Result<IggyMessages, IggyError> { + pub async fn get_first_messages(&self, count: u32) -> Result<IggyMessagesSlice, IggyError> { self.get_messages_by_offset(0, count).await } // Retrieves the last messages (up to a specified count). - pub async fn get_last_messages(&self, count: u32) -> Result<IggyMessages, IggyError> { + pub async fn get_last_messages(&self, count: u32) -> Result<IggyMessagesSlice, IggyError> { let mut requested_count = count as u64; if requested_count > self.current_offset + 1 { requested_count = self.current_offset + 1 @@ -120,7 +113,7 @@ impl Partition { &self, consumer: PollingConsumer, count: u32, - ) -> Result<IggyMessages, IggyError> { + ) -> Result<IggyMessagesSlice, IggyError> { let (consumer_offsets, consumer_id) = match consumer { PollingConsumer::Consumer(consumer_id, _) => (&self.consumer_offsets, consumer_id), PollingConsumer::ConsumerGroup(group_id, _) => (&self.consumer_group_offsets, group_id), @@ -183,13 +176,49 @@ impl Partition { .collect() } - // Retrieves messages from multiple segments. + /// Retrieves messages from multiple segments. async fn get_messages_from_segments( segments: Vec<&Segment>, offset: u64, count: u32, - ) -> Result<IggyMessages, IggyError> { - todo!() + ) -> Result<IggyMessagesSlice, IggyError> { + let mut slices = Vec::new(); + let mut remaining_count = count; + let mut current_offset = offset; + + for segment in segments { + if remaining_count == 0 { + break; + } + + let messages = segment + .get_messages_by_offset(current_offset, remaining_count) + .await + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to get messages from segment, segment: {}, \ + offset: {}, count: {}", + segment, current_offset, remaining_count + ) + })?; + + // Update remaining count and offset for next segment + let messages_count = messages.count(); + remaining_count = remaining_count.saturating_sub(messages_count); + + // Update the offset for the next segment if needed + if messages_count > 0 { + // If we got messages, the next offset should be after the last message + // from this segment + current_offset += messages_count as u64; + } + + slices.push(messages); + } + + // Combine all segment slices into a single composite slice + // This avoids copying the data + Ok(IggyMessagesSlice::combine(slices)) // let mut results = Vec::new(); // let mut remaining_count = count; // for segment in segments { @@ -221,6 +250,12 @@ impl Partition { messages: IggyMessagesMut, confirmation: Option<Confirmation>, ) -> Result<(), IggyError> { + trace!( + "Appending {} messages of size {} to partition with ID: {}...", + self.partition_id, + messages.count(), + messages.size() + ); { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; if last_segment.is_closed { @@ -232,11 +267,10 @@ impl Partition { self.add_persisted_segment(start_offset).await.with_error_context(|error| format!( "{COMPONENT} (error: {error}) - failed to add persisted segment, partition: {}, start offset: {}", self, start_offset, - ))?; + ))? } } - let messages_size = messages.size(); let current_offset = if !self.should_increment_offset { 0 } else { @@ -276,8 +310,6 @@ impl Partition { } */ - // Why are we even doing it this way ? XD - // What are those scope let messages_count = { let last_segment = self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?; last_segment @@ -289,18 +321,20 @@ impl Partition { ) })? }; - let last_offset = current_offset + messages_count as u64 - 1; + + // Handle the case when messages_count is 0 to avoid integer underflow + let last_offset = if messages_count == 0 { + current_offset + } else { + current_offset + messages_count as u64 - 1 + }; + if self.should_increment_offset { self.current_offset = last_offset; } else { self.should_increment_offset = true; self.current_offset = last_offset; } - /* - if let Some(cache) = &mut self.cache { - cache.extend(retained_messages); - } - */ self.unsaved_messages_count += messages_count; { diff --git a/server/src/streaming/partitions/partition.rs b/server/src/streaming/partitions/partition.rs index 75822f10..2a44ee88 100644 --- a/server/src/streaming/partitions/partition.rs +++ b/server/src/streaming/partitions/partition.rs @@ -88,16 +88,6 @@ impl Partition { config.get_consumer_offsets_path(stream_id, topic_id, partition_id); let consumer_group_offsets_path = config.get_consumer_group_offsets_path(stream_id, topic_id, partition_id); - //TODO: Fix me - /* - let (cached_memory_tracker, messages) = match config.cache.enabled { - false => (None, None), - true => ( - CacheMemoryTracker::initialize(&config.cache), - Some(SmartCache::new()), - ), - }; - */ let mut partition = Partition { stream_id, @@ -108,8 +98,6 @@ impl Partition { consumer_offsets_path, consumer_group_offsets_path, message_expiry, - //cache: messages, - //cached_memory_tracker, message_deduplicator: match config.message_deduplication.enabled { true => Some(MessageDeduplicator::new( if config.message_deduplication.max_entries > 0 { @@ -169,27 +157,6 @@ impl Partition { partition } - - pub fn get_cache_metrics(&self) -> CacheMetrics { - //TODO: Fix me - /* - if let Some(cache) = self.cache.as_ref() { - let cache_metrics = cache.get_metrics(); - CacheMetrics { - hits: cache_metrics.hits, - misses: cache_metrics.misses, - hit_ratio: cache_metrics.hit_ratio, - } - } else { - CacheMetrics { - hits: 0, - misses: 0, - hit_ratio: 0.0, - } - } - */ - Default::default() - } } impl Sizeable for Partition { @@ -214,7 +181,7 @@ impl fmt::Display for Partition { #[cfg(test)] mod tests { - use crate::configs::system::{CacheConfig, SystemConfig}; + use crate::configs::system::SystemConfig; use crate::streaming::partitions::partition::Partition; use crate::streaming::persistence::persister::{FileWithSyncPersister, PersisterKind}; use crate::streaming::storage::SystemStorage; diff --git a/server/src/streaming/segments/indexes/index.rs b/server/src/streaming/segments/indexes/index.rs index fd1f5ea0..c6f9111b 100644 --- a/server/src/streaming/segments/indexes/index.rs +++ b/server/src/streaming/segments/indexes/index.rs @@ -1,7 +1,3 @@ -use crate::streaming::segments::segment::Segment; -use iggy::error::IggyError; -use iggy::error::IggyError::InvalidOffset; - #[derive(Debug, Eq, Clone, Copy, Default)] pub struct Index { pub offset: u32, @@ -16,192 +12,3 @@ impl PartialEq<Self> for Index { && self.timestamp == other.timestamp } } - -#[derive(Debug, Eq, PartialEq, Clone, Copy, Default)] -pub struct FetchIndex { - pub offset: u32, - pub position: u32, - pub timestamp: u64, - pub messages_count: u32, -} - -#[derive(Debug, Clone, Copy, Default)] -pub struct IndexRange { - pub start: Index, - pub end: Index, -} - -impl Segment { - pub fn load_highest_lower_bound_index( - &self, - indices: &[Index], - start_offset: u32, - end_offset: u32, - ) -> Result<IndexRange, IggyError> { - let starting_offset_idx = binary_search_index(indices, start_offset); - let ending_offset_idx = binary_search_index(indices, end_offset); - - match (starting_offset_idx, ending_offset_idx) { - (Some(starting_offset_idx), Some(ending_offset_idx)) => Ok(IndexRange { - start: indices[starting_offset_idx], - end: indices[ending_offset_idx], - }), - (Some(starting_offset_idx), None) => Ok(IndexRange { - start: indices[starting_offset_idx], - end: *indices.last().unwrap(), - }), - (None, _) => Err(InvalidOffset(start_offset as u64 + self.start_offset)), - } - } -} - -fn binary_search_index(indices: &[Index], offset: u32) -> Option<usize> { - match indices.binary_search_by(|index| index.offset.cmp(&offset)) { - Ok(index) => Some(index), - Err(index) => { - if index < indices.len() { - Some(index) - } else { - None - } - } - } -} - -impl IndexRange { - pub fn max_range() -> Self { - Self { - start: Index { - offset: 0, - position: 0, - timestamp: 0, - }, - end: Index { - offset: u32::MAX - 1, - position: u32::MAX, - timestamp: u64::MAX, - }, - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::configs::system::{SegmentConfig, SystemConfig}; - use iggy::utils::expiry::IggyExpiry; - use std::sync::atomic::AtomicU64; - use std::sync::Arc; - - async fn create_segment() -> Segment { - let stream_id = 1; - let topic_id = 2; - let partition_id = 3; - let start_offset = 0; - let config = Arc::new(SystemConfig { - segment: SegmentConfig { - cache_indexes: true, - ..Default::default() - }, - ..Default::default() - }); - - Segment::create( - stream_id, - topic_id, - partition_id, - start_offset, - config, - IggyExpiry::NeverExpire, - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - Arc::new(AtomicU64::new(0)), - ) - } - - fn create_test_indices(segment: &mut Segment) { - let indexes = vec![ - Index { - offset: 5, - position: 0, - timestamp: 1000, - }, - Index { - offset: 20, - position: 100, - timestamp: 2000, - }, - Index { - offset: 35, - position: 200, - timestamp: 3000, - }, - Index { - offset: 50, - position: 300, - timestamp: 4000, - }, - Index { - offset: 65, - position: 400, - timestamp: 5000, - }, - ]; - segment.indexes.as_mut().unwrap().extend(indexes); - } - - #[tokio::test] - async fn should_find_both_indices() { - let mut segment = create_segment().await; - create_test_indices(&mut segment); - let result = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 15, 45) - .unwrap(); - - assert_eq!(result.start.offset, 20); - assert_eq!(result.end.offset, 50); - } - - #[tokio::test] - async fn start_and_end_index_should_be_equal() { - let mut segment = create_segment().await; - create_test_indices(&mut segment); - let result_end_range = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 65, 100) - .unwrap(); - - assert_eq!(result_end_range.start.offset, 65); - assert_eq!(result_end_range.end.offset, 65); - - let result_start_range = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 0, 5) - .unwrap(); - assert_eq!(result_start_range.start.offset, 5); - assert_eq!(result_start_range.end.offset, 5); - } - - #[tokio::test] - async fn should_clamp_last_index_when_out_of_range() { - let mut segment = create_segment().await; - create_test_indices(&mut segment); - let result = segment - .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 5, 100) - .unwrap(); - - assert_eq!(result.start.offset, 5); - assert_eq!(result.end.offset, 65); - } - - #[tokio::test] - async fn should_return_err_when_both_indices_out_of_range() { - let mut segment = create_segment().await; - create_test_indices(&mut segment); - - let result = - segment.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 100, 200); - assert!(result.is_err()); - } -} diff --git a/server/src/streaming/segments/indexes/index_reader.rs b/server/src/streaming/segments/indexes/index_reader.rs index 2ed09640..83a8eb3b 100644 --- a/server/src/streaming/segments/indexes/index_reader.rs +++ b/server/src/streaming/segments/indexes/index_reader.rs @@ -1,7 +1,4 @@ -use super::{ - index::{Index, IndexRange}, - INDEX_SIZE, -}; +use super::{index::Index, INDEX_SIZE}; use error_set::ErrContext; use iggy::error::IggyError; use std::{ @@ -92,66 +89,6 @@ impl SegmentIndexReader { Ok(indexes) } - /// Loads an index range from the index file given a start/end offset. - pub async fn load_index_range_impl( - &self, - index_start_offset: u64, - index_end_offset: u64, - segment_start_offset: u64, - ) -> Result<Option<IndexRange>, IggyError> { - if index_start_offset > index_end_offset { - trace!( - "Index start offset {} is greater than index end offset {} for file {}.", - index_start_offset, - index_end_offset, - self.file_path - ); - return Ok(None); - } - let file_size = self.file_size(); - if file_size == 0 { - trace!("Index file {} is empty.", self.file_path); - return Ok(None); - } - trace!("Index file length: {} bytes.", file_size); - - let relative_start_offset = (index_start_offset - segment_start_offset) as u32; - let relative_end_offset = (index_end_offset - segment_start_offset) as u32; - let mut index_range = IndexRange::default(); - - let buf = match self.read_at(0, file_size).await { - Ok(buf) => buf, - Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None), - Err(error) => { - error!( - "Error reading batch header at offset 0 in file {}: {error}", - self.file_path - ); - return Err(IggyError::CannotReadFile); - } - }; - let mut last_index = Index::default(); - for chunk in buf.chunks_exact(INDEX_SIZE as usize) { - let current_index = parse_index(chunk).with_error_context(|error| { - format!("Failed to parse index {}: {error}", self.file_path) - })?; - if current_index.offset >= relative_start_offset - && index_range.start == Index::default() - { - index_range.start = current_index; - } - if current_index.offset >= relative_end_offset { - index_range.end = current_index; - break; - } - last_index = current_index; - } - if index_range.end == Index::default() { - index_range.end = last_index; - } - Ok(Some(index_range)) - } - pub async fn load_index_for_timestamp_impl( &self, timestamp: u64, @@ -162,38 +99,57 @@ impl SegmentIndexReader { return Ok(Some(Index::default())); } - let buf = match self.read_at(0, file_size).await { - Ok(buf) => buf, - Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None), - Err(error) => { - error!( - "Error reading batch header at offset 0 in file {}: {error}", - self.file_path - ); - return Err(IggyError::CannotReadFile); - } - }; - let mut last_index: Option<Index> = None; - for chunk in buf.chunks_exact(INDEX_SIZE as usize) { - let current = parse_index(chunk)?; - if current.timestamp >= timestamp { - return Ok(Some(last_index.unwrap_or_default())); + let total_indexes = file_size / INDEX_SIZE; + trace!( + "Searching for timestamp {} in {} indexes", + timestamp, + total_indexes + ); + + let mut left = 0; + let mut right = total_indexes - 1; + let mut result: Option<Index> = None; + + while left <= right { + let mid = left + (right - left) / 2; + let buf = match self.read_at(mid * INDEX_SIZE, INDEX_SIZE).await { + Ok(buf) => buf, + Err(error) if error.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(error) => { + error!( + "Error reading index at position {} in file {}: {error}", + mid, self.file_path + ); + return Err(IggyError::CannotReadFile); + } + }; + let current = parse_index(&buf)?; + match current.timestamp.cmp(×tamp) { + std::cmp::Ordering::Equal => { + return Ok(Some(current)); + } + std::cmp::Ordering::Less => { + result = Some(current); + left = mid + 1; + } + std::cmp::Ordering::Greater => { + if mid == 0 { + return Ok(result.or(Some(Index::default()))); + } + right = mid - 1; + } } - last_index = Some(current); } - Ok(None) + trace!("Index for timestamp {timestamp}: {result:?}"); + Ok(result) } + /// Returns the size of the index file in bytes. fn file_size(&self) -> u32 { self.index_size_bytes.load(Ordering::Acquire) as u32 } - /// Returns the total number of indexes stored in the file. - pub fn get_indexes_count(&self) -> u32 { - let file_size = self.file_size(); - file_size / INDEX_SIZE - } - + /// Reads a specified number of bytes from the index file at a given offset. async fn read_at(&self, offset: u32, len: u32) -> Result<Vec<u8>, std::io::Error> { let file = self.file.clone(); spawn_blocking(move || { @@ -246,6 +202,7 @@ impl SegmentIndexReader { } } +/// Parses an index from a byte slice. fn parse_index(chunk: &[u8]) -> Result<Index, IggyError> { let offset = u32::from_le_bytes( chunk[0..4] diff --git a/server/src/streaming/segments/indexes/index_writer.rs b/server/src/streaming/segments/indexes/index_writer.rs index c8fa5739..306e7350 100644 --- a/server/src/streaming/segments/indexes/index_writer.rs +++ b/server/src/streaming/segments/indexes/index_writer.rs @@ -61,30 +61,6 @@ impl SegmentIndexWriter { }) } - /// Append the given index record to the index file. - pub async fn save_index(&mut self, index: Index) -> Result<(), IggyError> { - let mut buf = [0u8; INDEX_SIZE as usize]; - buf[0..4].copy_from_slice(&index.offset.to_le_bytes()); - buf[4..8].copy_from_slice(&index.position.to_le_bytes()); - buf[8..16].copy_from_slice(&index.timestamp.to_le_bytes()); - - { - self.file - .write_all(&buf) - .await - .with_error_context(|error| { - format!("Failed to write index to file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotSaveIndexToSegment)?; - } - if self.fsync { - let _ = self.fsync().await; - } - self.index_size_bytes - .fetch_add(INDEX_SIZE as u64, Ordering::Release); - Ok(()) - } - /// Append multiple index records to the index file in a single operation. pub async fn save_indexes(&mut self, indexes: &[Index]) -> Result<(), IggyError> { if indexes.is_empty() { diff --git a/server/src/streaming/segments/indexes/mod.rs b/server/src/streaming/segments/indexes/mod.rs index e17de088..5ad84180 100644 --- a/server/src/streaming/segments/indexes/mod.rs +++ b/server/src/streaming/segments/indexes/mod.rs @@ -6,6 +6,5 @@ mod index_writer; pub const INDEX_SIZE: u32 = 16; pub use index::Index; -pub use index::IndexRange; pub use index_reader::SegmentIndexReader; pub use index_writer::SegmentIndexWriter; diff --git a/server/src/streaming/segments/messages/messages_reader.rs b/server/src/streaming/segments/messages/messages_reader.rs index ee5cbb02..caff84d2 100644 --- a/server/src/streaming/segments/messages/messages_reader.rs +++ b/server/src/streaming/segments/messages/messages_reader.rs @@ -1,10 +1,8 @@ -use crate::streaming::segments::indexes::IndexRange; use bytes::{Bytes, BytesMut}; use error_set::ErrContext; use iggy::{ error::IggyError, prelude::{BytesSerializable, IggyMessages}, - utils::{byte_size::IggyByteSize, timestamp::IggyTimestamp}, }; use std::{ fmt, @@ -64,6 +62,8 @@ impl MessagesReader { log_size_bytes.store(actual_log_size, Ordering::Release); + trace!("Opened messages file for reading: {file_path}, size: {actual_log_size}"); + Ok(Self { file_path: file_path.to_string(), file: Arc::new(file), @@ -71,10 +71,9 @@ impl MessagesReader { }) } - // TODO: This one is most likely not needed anymore. /// Loads and returns all message IDs from the log file. pub async fn load_message_ids_impl(&self) -> Result<Vec<u128>, IggyError> { - let mut file_size = self.file_size(); + let file_size = self.file_size(); if file_size == 0 { trace!("Log file {} is empty.", self.file_path); return Ok(Vec::new()); @@ -118,10 +117,7 @@ impl MessagesReader { trace!("Messages file {} is empty.", self.file_path); return Ok(IggyMessages::default()); } - let messages_bytes = match self - .read_bytes_at(start_pos as u64, count_bytes as u64) - .await - { + let messages_bytes = match self.read_at(start_pos as u64, count_bytes as u64).await { Ok(buf) => buf, Err(error) if error.kind() == ErrorKind::UnexpectedEof => { return Ok(IggyMessages::default()) @@ -135,14 +131,14 @@ impl MessagesReader { } }; - IggyMessages::from_bytes(messages_bytes) + Ok(IggyMessages::new(messages_bytes, messages_count)) } fn file_size(&self) -> u64 { self.log_size_bytes.load(Ordering::Acquire) } - async fn read_bytes_at(&self, offset: u64, len: u64) -> Result<Bytes, std::io::Error> { + async fn read_at(&self, offset: u64, len: u64) -> Result<Bytes, std::io::Error> { let file = self.file.clone(); spawn_blocking(move || { let mut buf = BytesMut::with_capacity(len as usize); diff --git a/server/src/streaming/segments/messages/messages_writer.rs b/server/src/streaming/segments/messages/messages_writer.rs index 4bd8523f..bc46040b 100644 --- a/server/src/streaming/segments/messages/messages_writer.rs +++ b/server/src/streaming/segments/messages/messages_writer.rs @@ -12,7 +12,6 @@ use std::{ atomic::{AtomicU64, Ordering}, Arc, }, - time::Duration, }; use tokio::{ fs::{File, OpenOptions}, @@ -95,13 +94,17 @@ impl MessagesWriter { /// Append a messages to the messages file. pub async fn save_batches( &mut self, - messages: IggyMessages, + messages: Vec<IggyMessages>, confirmation: Confirmation, ) -> Result<IggyByteSize, IggyError> { - let messages_size = messages.size(); + let messages_size: usize = messages.iter().map(|m| m.size() as usize).sum(); + trace!( + "Saving batch of size {messages_size} bytes to messages file: {}", + self.file_path + ); match confirmation { Confirmation::Wait => { - self.write_batch(messages).await?; + self.write_batch_vectored(messages).await?; self.messages_size_bytes .fetch_add(messages_size as u64, Ordering::AcqRel); trace!( @@ -127,16 +130,10 @@ impl MessagesWriter { Ok(IggyByteSize::from(messages_size as u64)) } - /// Write a batch of bytes to the messages file and return the new file position. - async fn write_batch(&mut self, messages: IggyMessages) -> Result<(), IggyError> { + /// Write a batch of bytes to the log file. + async fn write_batch_vectored(&mut self, batches: Vec<IggyMessages>) -> Result<(), IggyError> { if let Some(ref mut file) = self.file { - file.write_all(messages.buffer()) - .await - .with_error_context(|error| { - format!("Failed to messages to file: {}. {error}", self.file_path) - }) - .map_err(|_| IggyError::CannotWriteToFile)?; - + super::write_batch_vectored(file, &self.file_path, batches).await?; Ok(()) } else { error!("File handle is not available for synchronous write."); diff --git a/server/src/streaming/segments/messages/mod.rs b/server/src/streaming/segments/messages/mod.rs index f5cd8fd6..e4bff590 100644 --- a/server/src/streaming/segments/messages/mod.rs +++ b/server/src/streaming/segments/messages/mod.rs @@ -5,3 +5,45 @@ mod persister_task; pub use messages_reader::MessagesReader; pub use messages_writer::MessagesWriter; pub use persister_task::PersisterTask; + +use error_set::ErrContext; +use iggy::{error::IggyError, prelude::IggyMessages}; +use std::io::IoSlice; +use tokio::{fs::File, io::AsyncWriteExt}; +use tracing::error; + +/// Write a batch of messages to a file using vectored I/O +/// +/// This function writes all the messages in the batch to the file using vectored I/O, +/// which can be more efficient than writing each message individually. +pub async fn write_batch_vectored( + file: &mut File, + file_path: &str, + batches: Vec<IggyMessages>, +) -> Result<u32, IggyError> { + let mut total_bytes_written = 0; + let mut slices_vec: Vec<IoSlice<'_>> = + batches.iter().map(|b| IoSlice::new(b.buffer())).collect(); + + let mut slices = slices_vec.as_mut_slice(); + + while !slices.is_empty() { + let written = file + .write_vectored(slices) + .await + .with_error_context(|error| { + format!("Failed to write vectored to file: {file_path}. {error}") + }) + .map_err(|_| IggyError::CannotWriteToFile)? as u32; + + if written == 0 { + error!("Failed to write batch of messages to file: {file_path}"); + return Err(IggyError::CannotWriteToFile); + } + + total_bytes_written += written; + IoSlice::advance_slices(&mut slices, written as usize); + } + + Ok(total_bytes_written) +} diff --git a/server/src/streaming/segments/messages/persister_task.rs b/server/src/streaming/segments/messages/persister_task.rs index 37728364..f76000e8 100644 --- a/server/src/streaming/segments/messages/persister_task.rs +++ b/server/src/streaming/segments/messages/persister_task.rs @@ -14,7 +14,7 @@ use tracing::{error, trace, warn}; #[derive(Debug)] /// A command to the persister task. enum PersisterTaskCommand { - WriteRequest(IggyMessages), + WriteRequest(Vec<IggyMessages>), Shutdown, } @@ -59,7 +59,7 @@ impl PersisterTask { } /// Sends the batch bytes to the persister task (fire-and-forget). - pub async fn persist(&self, messages: IggyMessages) { + pub async fn persist(&self, messages: Vec<IggyMessages>) { if let Err(e) = self .sender .send_async(PersisterTaskCommand::WriteRequest(messages)) @@ -213,7 +213,7 @@ impl PersisterTask { async fn write_with_retries( file: &mut File, file_path: &str, - messages: IggyMessages, + messages: Vec<IggyMessages>, fsync: bool, max_retries: u32, retry_delay: IggyDuration, @@ -225,41 +225,43 @@ impl PersisterTask { // There are few errors which are worth retrying such as LSE (Latent sector error), as the file system // might be able to reallocate a new sector for the data and recover, but not every file system supports that. // In general this topic should be furthered researched rather than just naively retry when the write fails. - let messages_size = messages.size(); - loop { - // TODO: use vectored API - match file.write_all(messages.buffer()).await { - Ok(_) => { - if fsync { - match file.sync_all().await { - Ok(_) => return Ok(messages_size), - Err(e) => { - attempts += 1; - error!( - "Error syncing file {file_path}: {:?} (attempt {attempts}/{max_retries})", - e, - ); - } - } - } else { - return Ok(messages_size); - } - } - Err(e) => { - attempts += 1; - error!( - "Error writing to file {file_path}: {:?} (attempt {attempts}/{max_retries})", - e, - ); - } - } - if attempts >= max_retries { - error!( - "Failed to write to file {file_path} after {max_retries} attempts, something's terribly wrong", - ); - return Err(IggyError::CannotWriteToFile); - } - sleep(retry_delay.get_duration()).await; - } + super::write_batch_vectored(file, file_path, messages).await + + // let messages_size = messages.iter().map(|m| m.size()).sum(); + // loop { + // // TODO: use vectored API + // match super::write_batch_vectored(file, file_path, messages).await { + // Ok(_) => { + // if fsync { + // match file.sync_all().await { + // Ok(_) => return Ok(messages_size), + // Err(e) => { + // attempts += 1; + // error!( + // "Error syncing file {file_path}: {:?} (attempt {attempts}/{max_retries})", + // e, + // ); + // } + // } + // } else { + // return Ok(messages_size); + // } + // } + // Err(e) => { + // attempts += 1; + // error!( + // "Error writing to file {file_path}: {:?} (attempt {attempts}/{max_retries})", + // e, + // ); + // } + // } + // if attempts >= max_retries { + // error!( + // "Failed to write to file {file_path} after {max_retries} attempts, something's terribly wrong", + // ); + // return Err(IggyError::CannotWriteToFile); + // } + // sleep(retry_delay.get_duration()).await; + // } } } diff --git a/server/src/streaming/segments/messages_accumulator copy 2.rs b/server/src/streaming/segments/messages_accumulator copy 2.rs deleted file mode 100644 index 28cec981..00000000 --- a/server/src/streaming/segments/messages_accumulator copy 2.rs +++ /dev/null @@ -1,141 +0,0 @@ -use super::{IggyMessagesMut, Index}; -use bytes::BytesMut; -use iggy::prelude::IggyMessages; -use iggy::utils::byte_size::IggyByteSize; -use iggy::utils::timestamp::IggyTimestamp; -use std::mem; - -/// A container that accumulates messages before they are written to disk -#[derive(Debug, Default)] -pub struct MessagesAccumulator { - /// Base offset of the first message - base_offset: u64, - - /// Base timestamp of the first message - base_timestamp: u64, - - /// Current size of all accumulated messages - current_size: IggyByteSize, - - /// Current maximum offset - current_offset: u64, - - /// Current maximum timestamp - current_timestamp: u64, - - /// A single buffer containing all accumulated messages - messages: Vec<u8>, - - /// Indexes - indexes: Vec<Index>, - - /// Number of messages in the accumulator - messages_count: u32, -} - -impl MessagesAccumulator { - /// Coalesces a batch of messages into the accumulator - /// This method also prepares the messages for persistence by setting their - /// offset, timestamp, record size, and checksum fields - /// Returns the number of messages in the batch - pub fn coalesce_batch( - &mut self, - current_offset: u64, - current_position: u32, - mut batch: IggyMessagesMut, - ) -> u32 { - // TODO(hubcio): add capacity - let batch_messages_count = batch.count(); - - if batch_messages_count == 0 { - return 0; - } - - if self.messages.is_empty() { - self.base_offset = current_offset; - self.base_timestamp = IggyTimestamp::now().as_micros(); - self.current_offset = current_offset; - self.current_timestamp = self.base_timestamp; - } - - batch.prepare_for_persistence(self.base_offset, self.current_timestamp); - - let batch_size = batch.size(); - let batch = batch.make_immutable(); - let mut current_position = current_position; - for message in batch.iter() { - let msg_size = message.as_ref().unwrap().size() as u32; - - let offset = message.as_ref().unwrap().msg_header().unwrap().offset(); - let relative_offset = (offset - self.base_offset) as u32; - let position = current_position; - let timestamp = message.as_ref().unwrap().msg_header().unwrap().timestamp(); - - self.indexes.push(Index { - offset: relative_offset, - position, - timestamp, - }); - - current_position += msg_size; - } - let mut messages: Vec<u8> = batch.into_inner().into(); - - self.messages.push(batch); - - self.messages_count += batch_messages_count; - self.current_size += IggyByteSize::from(batch_size as u64); - self.current_offset = self.base_offset + self.messages_count as u64 - 1; - self.current_timestamp = IggyTimestamp::now().as_micros(); - - batch_messages_count - } - - pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) -> IggyMessages { - let mut messages = IggyMessages::default(); - - for message in self.messages.iter() { - if message.msg_header().unwrap().offset() >= start_offset - && message.msg_header().unwrap().offset() <= end_offset - { - messages.push(message.clone()); - } - } - messages - } - - /// Checks if the accumulator is empty - pub fn is_empty(&self) -> bool { - self.messages.is_empty() || self.messages_count == 0 - } - - /// Returns the number of unsaved messages - pub fn unsaved_messages_count(&self) -> usize { - self.messages_count as usize - } - - /// Returns the maximum offset in the accumulator - pub fn max_offset(&self) -> u64 { - self.current_offset - } - - /// Returns the maximum timestamp in the accumulator - pub fn batch_max_timestamp(&self) -> u64 { - self.current_timestamp - } - - /// Returns the base offset of the accumulator - pub fn base_offset(&self) -> u64 { - self.base_offset - } - - /// Returns the base timestamp of the accumulator - pub fn batch_base_timestamp(&self) -> u64 { - self.base_timestamp - } - - /// Takes ownership of the accumulator and returns the messages - pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) { - (self.messages, self.indexes) - } -} diff --git a/server/src/streaming/segments/messages_accumulator copy.rs b/server/src/streaming/segments/messages_accumulator copy.rs deleted file mode 100644 index 5a5f9ffc..00000000 --- a/server/src/streaming/segments/messages_accumulator copy.rs +++ /dev/null @@ -1,106 +0,0 @@ -use bytes::BytesMut; -use iggy::models::{IggyMessages, IggyMessagesMut}; -use iggy::utils::byte_size::IggyByteSize; -use iggy::utils::timestamp::IggyTimestamp; -use std::mem; - -/// A container that accumulates messages before they are written to disk -#[derive(Debug, Default)] -pub struct MessagesAccumulator { - /// Base offset of the first message - base_offset: u64, - - /// Base timestamp of the first message - base_timestamp: u64, - - /// Current size of all accumulated messages - current_size: IggyByteSize, - - /// Current maximum offset - current_offset: u64, - - /// Current maximum timestamp - current_timestamp: u64, - - /// A single IggyMessagesMut instance containing all messages - messages: Option<IggyMessagesMut>, - - /// Number of messages in the accumulator - messages_count: u32, -} - -impl MessagesAccumulator { - /// Coalesces a batch of messages into the accumulator - /// This method also prepares the messages for persistence by setting their - /// offset, timestamp, record size, and checksum fields - /// Returns the number of messages in the batch - pub fn coalesce_batch(&mut self, current_offset: u64, mut batch: IggyMessagesMut) -> u32 { - // TODO(hubcio): add capacity - let batch_messages_count = batch.count(); - - if batch_messages_count == 0 { - return 0; - } - - if self.messages.is_none() { - self.base_offset = current_offset; - self.base_timestamp = IggyTimestamp::now().as_micros(); - self.current_offset = current_offset; - self.current_timestamp = self.base_timestamp; - } - - batch.prepare_for_persistence(self.base_offset, Some(self.current_timestamp)); - - let batch_size = batch.size(); - - // Add the batch to our messages - if let Some(messages) = &mut self.messages { - messages.extend(batch); - } else { - self.messages = Some(batch); - // self.messages.as_mut().unwrap().set_capacity(1_200_000); // TODO(hubcio): Add new config and multiply this by k factor - } - - self.messages_count += batch_messages_count; - self.current_size += IggyByteSize::from(batch_size as u64); - self.current_offset = self.base_offset + self.messages_count as u64 - 1; - self.current_timestamp = IggyTimestamp::now().as_micros(); - - batch_messages_count - } - - /// Checks if the accumulator is empty - pub fn is_empty(&self) -> bool { - self.messages.is_none() || self.messages_count == 0 - } - - /// Returns the number of unsaved messages - pub fn unsaved_messages_count(&self) -> usize { - self.messages_count as usize - } - - /// Returns the maximum offset in the accumulator - pub fn max_offset(&self) -> u64 { - self.current_offset - } - - /// Returns the maximum timestamp in the accumulator - pub fn batch_max_timestamp(&self) -> u64 { - self.current_timestamp - } - - /// Returns the base offset of the accumulator - pub fn base_offset(&self) -> u64 { - self.base_offset - } - - /// Returns the base timestamp of the accumulator - pub fn batch_base_timestamp(&self) -> u64 { - self.base_timestamp - } - - /// Takes ownership of the accumulator and returns the messages - pub fn materialize(self) -> IggyMessages { - self.messages.unwrap().into() - } -} diff --git a/server/src/streaming/segments/messages_accumulator.rs b/server/src/streaming/segments/messages_accumulator.rs index 4d9ea197..45bb2f35 100644 --- a/server/src/streaming/segments/messages_accumulator.rs +++ b/server/src/streaming/segments/messages_accumulator.rs @@ -1,9 +1,8 @@ use super::{IggyMessagesMut, Index}; -use bytes::{Bytes, BytesMut}; -use iggy::prelude::{IggyMessageView, IggyMessages}; +use crate::streaming::segments::types::IggyMessagesSlice; +use iggy::prelude::IggyMessages; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::timestamp::IggyTimestamp; -use std::mem; /// A container that accumulates messages before they are written to disk #[derive(Debug, Default)] @@ -23,8 +22,8 @@ pub struct MessagesAccumulator { /// Current maximum timestamp current_timestamp: u64, - /// A single buffer containing all accumulated messages - messages: Vec<u8>, + /// A multiple buffers containing all accumulated messages + messages: Vec<IggyMessages>, /// Indexes indexes: Vec<Index>, @@ -58,18 +57,19 @@ impl MessagesAccumulator { self.current_timestamp = self.base_timestamp; } + // TODO(hubcio): pass indexes vec and modify batch.prepare_for_persistence(self.base_offset, self.current_timestamp); let batch_size = batch.size(); let batch = batch.make_immutable(); let mut current_position = current_position; for message in batch.iter() { - let msg_size = message.as_ref().unwrap().size() as u32; + let msg_size = message.size() as u32; - let offset = message.as_ref().unwrap().msg_header().unwrap().offset(); + let offset = message.msg_header().offset(); let relative_offset = (offset - self.base_offset) as u32; let position = current_position; - let timestamp = message.as_ref().unwrap().msg_header().unwrap().timestamp(); + let timestamp = message.msg_header().timestamp(); self.indexes.push(Index { offset: relative_offset, @@ -79,9 +79,7 @@ impl MessagesAccumulator { current_position += msg_size; } - let messages: Vec<u8> = batch.into_inner().into(); - - self.messages.extend(messages); + self.messages.push(batch); self.messages_count += batch_messages_count; self.current_size += IggyByteSize::from(batch_size as u64); @@ -90,35 +88,56 @@ impl MessagesAccumulator { batch_messages_count } + pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) -> IggyMessagesSlice { + let start_idx = (start_offset - self.base_offset) as usize; + let end_idx = (end_offset - self.base_offset) as usize; // inclusive + if self.indexes.is_empty() || start_idx >= self.indexes.len() { + return IggyMessagesSlice::empty(); + } + let start_byte = self.indexes[start_idx].position as usize; + let end_byte = if end_idx + 1 < self.indexes.len() { + self.indexes[end_idx + 1].position as usize + } else { + self.current_size.as_bytes_usize() + }; - pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) -> IggyMessages { - let relative_start = start_offset - self.base_offset; - let relative_end = end_offset - self.base_offset; - let mut pos = 0; - let mut msg_index = 0; - let mut start_byte = None; - let mut end_byte = None; - while pos < self.messages.len() { - let view = IggyMessageView::new(&self.messages[pos..]).unwrap(); - if msg_index == relative_start { - start_byte = Some(pos); + let mut cumulative = Vec::with_capacity(self.messages.len()); + let mut sum = 0usize; + for msg in &self.messages { + sum += msg.buffer().len(); + cumulative.push(sum); + } + + let mut buffers = Vec::new(); + for (i, &cum) in cumulative.iter().enumerate() { + let batch_start = if i == 0 { 0 } else { cumulative[i - 1] }; + let batch_end = cum; + if batch_end <= start_byte { + continue; } - pos += view.size(); - msg_index += 1; - if msg_index == relative_end + 1 { - end_byte = Some(pos); + if batch_start >= end_byte { break; } + let slice_start = start_byte.saturating_sub(batch_start); + let slice_end = if end_byte < batch_end { + end_byte - batch_start + } else { + batch_end - batch_start + }; + buffers.push( + self.messages[i] + .shallow_copy() + .slice(slice_start..slice_end), + ); } - let start_byte = start_byte.unwrap_or(0); - let end_byte = end_byte.unwrap_or(self.messages.len()); - let slice = &self.messages[start_byte..end_byte]; - let count = if relative_end >= relative_start { - relative_end - relative_start + 1 + + let count = if end_idx >= start_idx { + (end_idx - start_idx + 1) as u32 } else { 0 }; - IggyMessages::new(Bytes::copy_from_slice(slice), count as u32) + + IggyMessagesSlice::from_buffers(buffers, count) } /// Checks if the accumulator is empty @@ -151,10 +170,8 @@ impl MessagesAccumulator { self.base_timestamp } - /// Takes ownership of the accumulator and returns the messages - pub fn materialize(self) -> (IggyMessages, Vec<Index>) { - let messages = IggyMessages::new(Bytes::from(self.messages), self.messages_count); - let indexes = self.indexes; - (messages, indexes) + /// Takes ownership of the accumulator and returns the messages and indexes + pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) { + (self.messages, self.indexes) } } diff --git a/server/src/streaming/segments/mod.rs b/server/src/streaming/segments/mod.rs index ffbde13e..524d9507 100644 --- a/server/src/streaming/segments/mod.rs +++ b/server/src/streaming/segments/mod.rs @@ -11,6 +11,7 @@ pub use segment::Segment; pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut; pub use types::IggyMessagesMut; +pub use types::IggyMessagesSlice; pub const LOG_EXTENSION: &str = "log"; pub const INDEX_EXTENSION: &str = "index"; diff --git a/server/src/streaming/segments/reading_messages.rs b/server/src/streaming/segments/reading_messages.rs index 379200e6..e680fec4 100644 --- a/server/src/streaming/segments/reading_messages.rs +++ b/server/src/streaming/segments/reading_messages.rs @@ -1,6 +1,7 @@ use super::Index; use crate::streaming::segments::segment::Segment; -use bytes::BytesMut; +use crate::streaming::segments::types::IggyMessagesSlice; +use bytes::{Bytes, BytesMut}; use error_set::ErrContext; use iggy::prelude::*; use std::sync::Arc; @@ -20,43 +21,32 @@ impl Segment { pub async fn get_messages_by_timestamp( &self, start_timestamp: u64, - count: usize, - ) -> Result<Vec<Arc<()>>, IggyError> { - todo!(); - // if count == 0 { - // return Ok(Vec::new()); - // } - - // let mut messages = Vec::with_capacity(count); - // let mut remaining = count; - - // let disk_messages = self - // .load_messages_from_disk_by_timestamp(start_timestamp, remaining) - // .await?; - // let disk_count = disk_messages.len(); - // messages.extend(disk_messages); - // remaining -= disk_count; - - // if remaining > 0 { - // if let Some(messages_accumulator) = &self.unsaved_messages { - // let buffer_messages = - // messages_accumulator.get_messages_by_timestamp(start_timestamp, remaining); - // messages.extend(buffer_messages); - // } - // } - - // // Ensure we return exactly requested count (truncate if buffer had more) - // messages.truncate(count); - // Ok(messages) + count: u32, + ) -> Result<IggyMessagesSlice, IggyError> { + if count == 0 { + return Ok(IggyMessagesSlice::empty()); + } + + let index_opt = self.load_index_for_timestamp(start_timestamp).await?; + + let Some(index) = index_opt else { + trace!("No messages found for timestamp: {}", start_timestamp); + return Ok(IggyMessagesSlice::empty()); + }; + + let offset = self.start_offset + index.offset as u64; + trace!("Found offset {} for timestamp {}", offset, start_timestamp); + + self.get_messages_by_offset(offset, count).await } pub async fn get_messages_by_offset( &self, mut offset: u64, count: u32, - ) -> Result<IggyMessages, IggyError> { + ) -> Result<IggyMessagesSlice, IggyError> { if count == 0 { - return Ok(IggyMessages::default()); + return Ok(IggyMessagesSlice::empty()); } if offset < self.start_offset { @@ -74,12 +64,12 @@ impl Segment { // In case that the partition messages buffer is disabled, we need to check the unsaved messages buffer if self.unsaved_messages.is_none() { - return self.load_messages_from_disk(offset, count).await; + return Ok(self.load_messages_from_disk(offset, count).await?); } let messages_accumulator = self.unsaved_messages.as_ref().unwrap(); if messages_accumulator.is_empty() { - return self.load_messages_from_disk(offset, count).await; + return Ok(self.load_messages_from_disk(offset, count).await?); } let accumulator_first_msg_offset = messages_accumulator.base_offset(); @@ -92,11 +82,12 @@ impl Segment { // Case 2: All messages are on disk if end_offset < accumulator_first_msg_offset { - return self.load_messages_from_disk(offset, count).await; + return Ok(self.load_messages_from_disk(offset, count).await?); } // Case 3: Messages span disk and messages_require_to_save buffer boundary - let mut buffer = BytesMut::new(); + + let mut slices = Vec::new(); // Load messages from disk up to the messages_require_to_save buffer boundary if offset < accumulator_first_msg_offset { @@ -106,37 +97,22 @@ impl Segment { "{COMPONENT} (error: {error}) - failed to load messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end offset :{}", self.stream_id, self.topic_id, self.partition_id, accumulator_first_msg_offset - 1 ))?; - buffer.extend_from_slice(disk_messages.buffer()); + slices.push(disk_messages); } // Load remaining messages from messages_require_to_save buffer let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset); let buffer_messages = self.load_messages_from_unsaved_buffer(buffer_start, end_offset); - buffer.extend_from_slice(buffer_messages.buffer()); - - let total_count = if buffer.is_empty() { - 0 - } else { - IggyMessageViewIterator::new(&buffer).count() as u32 - }; - - Ok(IggyMessages::new(buffer.freeze(), total_count)) - } + slices.push(buffer_messages); - pub async fn get_all_messages(&self) -> Result<IggyMessages, IggyError> { - //TODO: Fix me - /* - self.get_messages_by_offset(self.start_offset, self.get_messages_count() as u32) - .await - */ - todo!() + Ok(IggyMessagesSlice::combine(slices)) } fn load_messages_from_unsaved_buffer( &self, start_offset: u64, end_offset: u64, - ) -> IggyMessages { + ) -> IggyMessagesSlice { let messages_accumulator = self.unsaved_messages.as_ref().unwrap(); messages_accumulator.get_messages_by_offset(start_offset, end_offset) } @@ -145,127 +121,97 @@ impl Segment { &self, timestamp: u64, ) -> Result<Option<Index>, IggyError> { - todo!() - - // trace!("Loading index for timestamp: {}", timestamp); - // let index = self - // .index_reader - // .as_ref() - // .unwrap() - // .load_index_for_timestamp_impl(timestamp) - // .await - // .with_error_context(|error| { - // format!( - // "Failed to load index for timestamp: {timestamp} for {}. {error}", - // self - // ) - // })?; - - // trace!("Loaded index: {:?}", index); - // Ok(index) - } - - async fn load_messages_from_disk_by_timestamp( - &self, - start_timestamp: u64, - count: usize, - ) -> Result<Vec<Arc<()>>, IggyError> { - //TODO Fix me - /* - let index = self.load_index_for_timestamp(start_timestamp).await?; - let Some(index) = index else { - return Ok(Vec::new()); - }; - - let index_range = IndexRange { - start: index, - end: Index { - offset: u32::MAX, - position: u32::MAX, - timestamp: u64::MAX, - }, - }; - let batches = self.load_batches_by_range(&index_range).await?; - - let mut messages = Vec::with_capacity(count); - for batch in batches { - for msg in batch.into_messages_iter() { - if msg.timestamp >= start_timestamp { - messages.push(Arc::new(msg)); - if messages.len() >= count { - break; - } - } - } - if messages.len() >= count { - break; - } + if timestamp < self.start_timestamp { + trace!( + "Timestamp {} is earlier than segment start timestamp {}", + timestamp, + self.start_timestamp + ); + return Ok(Some(Index::default())); } - Ok(messages) - */ - todo!() - } + if timestamp > self.end_timestamp { + trace!( + "Timestamp {} is later than segment end timestamp {}", + timestamp, + self.end_timestamp + ); + return Ok(None); + } - /// Loads and verifies message checksums from the log file. - pub async fn load_message_checksums(&self) -> Result<(), IggyError> { - //TODO: Fix me - /* - self.log_reader + trace!("Loading index for timestamp: {}", timestamp); + let index = self + .index_reader .as_ref() .unwrap() - .load_batches_by_range_with_callback(&IndexRange::max_range(), |batch| { - for message in batch.into_messages_iter() { - let calculated_checksum = checksum::calculate(&message.payload); - trace!( - "Loaded message for offset: {}, checksum: {}, expected: {}", - message.offset, - calculated_checksum, - message.checksum - ); - if calculated_checksum != message.checksum { - return Err(IggyError::InvalidMessageChecksum( - calculated_checksum, - message.checksum, - message.offset, - )); - } - } - Ok(()) - }) + .load_index_for_timestamp_impl(timestamp) .await .with_error_context(|error| { - format!("Failed to load batches by max range for {}. {error}", self) + format!( + "Failed to load index for timestamp: {timestamp} for {}. {error}", + self + ) })?; - Ok(()) - */ - todo!() - } - /// Loads and returns all message IDs from the log file. - pub async fn load_message_ids(&self) -> Result<Vec<u128>, IggyError> { - todo!() + trace!("Loaded index: {:?}", index); + Ok(index) + } - // trace!("Loading message IDs from log file: {}", self.log_path); - // let ids = self - // .log_reader + /// Loads and verifies message checksums from the log file. + pub async fn load_message_checksums(&self) -> Result<(), IggyError> { + // self.log_reader // .as_ref() // .unwrap() - // .load_message_ids_impl() + // .load_batches_by_range_with_callback(&IndexRange::max_range(), |batch| { + // for message in batch.into_messages_iter() { + // let calculated_checksum = checksum::calculate(&message.payload); + // trace!( + // "Loaded message for offset: {}, checksum: {}, expected: {}", + // message.offset, + // calculated_checksum, + // message.checksum + // ); + // if calculated_checksum != message.checksum { + // return Err(IggyError::InvalidMessageChecksum( + // calculated_checksum, + // message.checksum, + // message.offset, + // )); + // } + // } + // Ok(()) + // }) // .await // .with_error_context(|error| { - // format!("Failed to load message IDs, error: {error} for {self}") + // format!("Failed to load batches by max range for {}. {error}", self) // })?; - // trace!("Loaded {} message IDs from log file.", ids.len()); - // Ok(ids) + // Ok(()) + + todo!() + } + + /// Loads and returns all message IDs from the log file. + pub async fn load_message_ids(&self) -> Result<Vec<u128>, IggyError> { + trace!("Loading message IDs from log file: {}", self.log_path); + let ids = self + .messages_reader + .as_ref() + .unwrap() + .load_message_ids_impl() + .await + .with_error_context(|error| { + format!("Failed to load message IDs, error: {error} for {self}") + })?; + trace!("Loaded {} message IDs from log file.", ids.len()); + Ok(ids) } async fn load_messages_from_disk( &self, start_offset: u64, count: u32, - ) -> Result<IggyMessages, IggyError> { - trace!( + ) -> Result<IggyMessagesSlice, IggyError> { + tracing::trace!( "Loading {count} messages from disk, start_offset: {start_offset}, current_offset: {}...", self.current_offset ); @@ -278,8 +224,13 @@ impl Segment { .as_ref() .unwrap() .load_nth_index(relative_start_offset) - .await? - .unwrap(); + .await?; + + if first_index.is_none() { + return Ok(IggyMessagesSlice::empty()); + } + + let first_index = first_index.unwrap(); let last_index = self .index_reader @@ -301,6 +252,14 @@ impl Segment { .with_error_context(|error| { format!("Failed to load messages from segment file: {self}. {error}") })?; - Ok(messages) + let end = messages.size(); + let count = messages.count(); + + Ok(IggyMessagesSlice::new( + messages.into_inner(), + 0, + end as usize, + count, + )) } } diff --git a/server/src/streaming/segments/types/message_header_view_mut.rs b/server/src/streaming/segments/types/message_header_view_mut.rs index 5525a904..d4cfc067 100644 --- a/server/src/streaming/segments/types/message_header_view_mut.rs +++ b/server/src/streaming/segments/types/message_header_view_mut.rs @@ -1,4 +1,3 @@ -use iggy::error::IggyError; use iggy::prelude::*; /// A typed, in-place view of a raw header in a buffer @@ -10,8 +9,8 @@ pub struct IggyMessageHeaderViewMut<'a> { impl<'a> IggyMessageHeaderViewMut<'a> { /// Construct a mutable view over the header slice. - pub fn new(data: &'a mut [u8]) -> Result<Self, IggyError> { - Ok(Self { data }) + pub fn new(data: &'a mut [u8]) -> Self { + Self { data } } pub fn checksum(&self) -> u64 { diff --git a/server/src/streaming/segments/types/message_view_mut.rs b/server/src/streaming/segments/types/message_view_mut.rs index 70ea2c2f..e83ae16d 100644 --- a/server/src/streaming/segments/types/message_view_mut.rs +++ b/server/src/streaming/segments/types/message_view_mut.rs @@ -9,10 +9,9 @@ use std::ops::Range; pub struct IggyMessageViewMut<'a> { /// The buffer containing the message buffer: &'a mut [u8], + /// Payload offset payload_offset: usize, - /// User headers offset - headers_offset: usize, } impl<'a> IggyMessageViewMut<'a> { @@ -20,7 +19,7 @@ impl<'a> IggyMessageViewMut<'a> { pub fn new(buffer: &'a mut [u8]) -> Result<Self, IggyError> { let (payload_len, headers_len) = { let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]; - let hdr_view = IggyMessageHeaderView::new(hdr_slice)?; + let hdr_view = IggyMessageHeaderView::new(hdr_slice); (hdr_view.payload_length(), hdr_view.headers_length()) }; let total_size = IGGY_MESSAGE_HEADER_SIZE + payload_len + headers_len; @@ -31,18 +30,17 @@ impl<'a> IggyMessageViewMut<'a> { Ok(Self { buffer, payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize, - headers_offset: IGGY_MESSAGE_HEADER_SIZE as usize + payload_len as usize, }) } /// Get an immutable header view - pub fn msg_header(&self) -> Result<IggyMessageHeaderView<'_>, IggyError> { + pub fn msg_header(&self) -> IggyMessageHeaderView<'_> { let hdr_slice = &self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]; IggyMessageHeaderView::new(hdr_slice) } /// Get an ephemeral mutable header view for reading/writing - pub fn msg_header_mut(&mut self) -> Result<IggyMessageHeaderViewMut<'_>, IggyError> { + pub fn msg_header_mut(&mut self) -> IggyMessageHeaderViewMut<'_> { let hdr_slice = &mut self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize]; IggyMessageHeaderViewMut::new(hdr_slice) } @@ -50,7 +48,7 @@ impl<'a> IggyMessageViewMut<'a> { /// Returns the size of the entire message (header + payload + user headers). pub fn size(&self) -> usize { // TODO(hubcio): remove unwraps() - let hdr_view = self.msg_header().expect("header must be valid"); + let hdr_view = self.msg_header(); (IGGY_MESSAGE_HEADER_SIZE + hdr_view.payload_length() + hdr_view.headers_length()) as usize } @@ -62,14 +60,14 @@ impl<'a> IggyMessageViewMut<'a> { /// Convenience to update the checksum field in the header pub fn update_checksum(&mut self) { - let start = 8; // Skip checksum field for checksum calculation - let size = self.size() - 8; - let data = &self.buffer[start..start + size]; + // let start = 8; // Skip checksum field for checksum calculation + // let size = self.size() - 8; + // let data = &self.buffer[start..start + size]; - let checksum = gxhash64(data, 0); + // let checksum = gxhash64(data, 0); - let mut hdr_view = self.msg_header_mut().expect("header must be valid"); - hdr_view.set_checksum(checksum); + // let mut hdr_view = self.msg_header_mut(); + // hdr_view.set_checksum(checksum); } } diff --git a/server/src/streaming/segments/types/messages_mut.rs b/server/src/streaming/segments/types/messages_mut.rs index a86d3c24..7fbbca22 100644 --- a/server/src/streaming/segments/types/messages_mut.rs +++ b/server/src/streaming/segments/types/messages_mut.rs @@ -10,6 +10,7 @@ pub struct IggyMessagesMut { /// The number of messages in the buffer #[serde(skip)] count: u32, + /// The buffer containing the messages buffer: BytesMut, } @@ -48,7 +49,9 @@ impl IggyMessagesMut { for message in messages { buffer.extend_from_slice(&message.to_bytes()); } - Self::new(buffer) + let mut messages_mut = Self::new(buffer); + messages_mut.count = messages.len() as u32; + messages_mut } /// Extends the messages container with another set of mutable messages by consuming them @@ -96,15 +99,7 @@ impl IggyMessagesMut { /// Prepares all messages in the batch for persistence by setting their offsets, timestamps, /// and other necessary fields. This method should be called before the messages are written to disk. - /// - /// # Arguments - /// - /// * `base_offset` - The offset to start assigning from - /// * `timestamp` - The timestamp to use (if None, current timestamp will be used) - /// - /// # Returns - /// - /// The number of messages processed + /// Returns the number of messages processed. pub fn prepare_for_persistence(&mut self, base_offset: u64, timestamp: u64) -> u32 { let mut processed_count = 0; let mut current_offset = base_offset; @@ -112,11 +107,10 @@ impl IggyMessagesMut { let mut iterator = self.iter_mut(); while let Some(message_result) = LendingIterator::next(&mut iterator) { if let Ok(mut message) = message_result { - // TODO(hubcio): remove unwraps - message.msg_header_mut().unwrap().set_offset(current_offset); + message.msg_header_mut().set_offset(current_offset); - if message.msg_header().unwrap().timestamp() == 0 { - message.msg_header_mut().unwrap().set_timestamp(timestamp); + if message.msg_header().timestamp() == 0 { + message.msg_header_mut().set_timestamp(timestamp); } message.update_checksum(); @@ -133,3 +127,13 @@ impl IggyMessagesMut { IggyMessages::new(self.buffer.freeze(), self.count) } } + +impl From<&[IggyMessage]> for IggyMessagesMut { + fn from(messages: &[IggyMessage]) -> Self { + let messages_size: u32 = messages + .iter() + .map(|m| m.get_size_bytes().as_bytes_u64() as u32) + .sum(); + Self::from_messages(messages, messages_size) + } +} diff --git a/server/src/streaming/segments/types/messages_slice.rs b/server/src/streaming/segments/types/messages_slice.rs new file mode 100644 index 00000000..740f065b --- /dev/null +++ b/server/src/streaming/segments/types/messages_slice.rs @@ -0,0 +1,70 @@ +use bytes::{Bytes, BytesMut}; +use iggy::prelude::*; + +#[derive(Debug, Clone)] +pub struct IggyMessagesSlice { + buffers: Vec<Bytes>, + count: u32, +} + +impl IggyMessagesSlice { + pub fn new(buffer: Bytes, start: usize, end: usize, count: u32) -> Self { + Self { + buffers: vec![buffer.slice(start..end)], + count, + } + } + + pub fn from_buffers(buffers: Vec<Bytes>, count: u32) -> Self { + Self { buffers, count } + } + + pub fn combine(slices: Vec<IggyMessagesSlice>) -> Self { + let mut buffers = Vec::new(); + let mut total_count = 0; + for slice in slices { + buffers.extend(slice.buffers); + total_count += slice.count; + } + Self { + buffers, + count: total_count, + } + } + + pub fn chunks(&self) -> impl Iterator<Item = &[u8]> + '_ { + self.buffers.iter().map(|b| b.as_ref()) + } + + pub fn chunks_count(&self) -> usize { + self.buffers.len() + } + + pub fn to_messages(&self) -> IggyMessages { + let total_size: usize = self.buffers.iter().map(|b| b.len()).sum(); + let mut combined = BytesMut::with_capacity(total_size); + for chunk in &self.buffers { + combined.extend_from_slice(chunk); + } + IggyMessages::new(combined.freeze(), self.count) + } + + pub fn count(&self) -> u32 { + self.count + } + + pub fn size(&self) -> u32 { + self.buffers.iter().map(|b| b.len() as u32).sum() + } + + pub fn is_empty(&self) -> bool { + self.count == 0 || self.buffers.is_empty() + } + + pub fn empty() -> Self { + Self { + buffers: Vec::new(), + count: 0, + } + } +} diff --git a/server/src/streaming/segments/types/messages_slice_better.rs b/server/src/streaming/segments/types/messages_slice_better.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/server/src/streaming/segments/types/messages_slice_better.rs @@ -0,0 +1 @@ + diff --git a/server/src/streaming/segments/types/mod.rs b/server/src/streaming/segments/types/mod.rs index ecb07a11..016bc7b2 100644 --- a/server/src/streaming/segments/types/mod.rs +++ b/server/src/streaming/segments/types/mod.rs @@ -1,7 +1,10 @@ mod message_header_view_mut; mod message_view_mut; mod messages_mut; +mod messages_slice; +mod messages_slice_better; pub use message_header_view_mut::IggyMessageHeaderViewMut; pub use message_view_mut::IggyMessageViewMut; pub use messages_mut::IggyMessagesMut; +pub use messages_slice::IggyMessagesSlice; diff --git a/server/src/streaming/segments/writing_messages.rs b/server/src/streaming/segments/writing_messages.rs index bc81192f..f9c3d303 100644 --- a/server/src/streaming/segments/writing_messages.rs +++ b/server/src/streaming/segments/writing_messages.rs @@ -106,9 +106,7 @@ impl Segment { .unwrap() .save_batches(messages, confirmation) .await - .with_error_context( - |error| format!("Failed to save batch for seg: {self}. {error}",), - )?; + .with_error_context(|error| format!("Failed to save batch for {self}. {error}",))?; self.last_index_position += saved_bytes.as_bytes_u64() as u32; diff --git a/server/src/streaming/systems/messages.rs b/server/src/streaming/systems/messages.rs index 77c50111..0eff2f1b 100644 --- a/server/src/streaming/systems/messages.rs +++ b/server/src/streaming/systems/messages.rs @@ -1,4 +1,4 @@ -use crate::streaming::segments::IggyMessagesMut; +use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice}; use crate::streaming::session::Session; use crate::streaming::systems::system::System; use crate::streaming::systems::COMPONENT; @@ -19,7 +19,7 @@ impl System { topic_id: &Identifier, partition_id: Option<u32>, args: PollingArgs, - ) -> Result<IggyMessages, IggyError> { + ) -> Result<IggyMessagesSlice, IggyError> { self.ensure_authenticated(session)?; if args.count == 0 { return Err(IggyError::InvalidMessagesCount); @@ -60,57 +60,46 @@ impl System { .get_messages(polling_consumer, partition_id, args.strategy, args.count) .await?; - // TODO: Fix me - /* - if polled_messages.messages.is_empty() { - return Ok(polled_messages); - } - */ + return Ok(result); - // TODO: Fix me - /* - let offset = polled_messages.messages.last().unwrap().offset; - if args.auto_commit { - trace!("Last offset: {} will be automatically stored for {}, stream: {}, topic: {}, partition: {}", offset, consumer, stream_id, topic_id, partition_id); - topic - .store_consumer_offset_internal(polling_consumer, offset, partition_id) - .await - .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {}, offset: {}, partition ID: {}", polling_consumer, offset, partition_id)) ?; - } - */ + // let offset = polled_messages.messages.last().unwrap().offset; + // if args.auto_commit { + // trace!("Last offset: {} will be automatically stored for {}, stream: {}, topic: {}, partition: {}", offset, consumer, stream_id, topic_id, partition_id); + // topic + // .store_consumer_offset_internal(polling_consumer, offset, partition_id) + // .await + // .with_error_context(|error| format!("{COMPONENT} (error: {error}) - failed to store consumer offset internal, polling consumer: {}, offset: {}, partition ID: {}", polling_consumer, offset, partition_id)) ?; + // } - if self.encryptor.is_none() { - return Ok(result); - } + // if self.encryptor.is_none() { + // return Ok(result); + // } // TODO: Fix me - /* - let encryptor = self.encryptor.as_ref().unwrap(); - let mut decrypted_messages = Vec::with_capacity(polled_messages.messages.len()); - for message in polled_messages.messages.iter() { - let payload = encryptor.decrypt(&message.payload); - match payload { - Ok(payload) => { - decrypted_messages.push(PolledMessage { - id: message.id, - state: message.state, - offset: message.offset, - timestamp: message.timestamp, - checksum: message.checksum, - length: IggyByteSize::from(payload.len() as u64), - payload: Bytes::from(payload), - headers: message.headers.clone(), - }); - } - Err(error) => { - error!("Cannot decrypt the message. Error: {}", error); - return Err(IggyError::CannotDecryptData); - } - } - } - polled_messages.messages = decrypted_messages; - Ok(polled_messages) - */ - todo!() + // let encryptor = self.encryptor.as_ref().unwrap(); + // let mut decrypted_messages = Vec::with_capacity(polled_messages.messages.len()); + // for message in polled_messages.messages.iter() { + // let payload = encryptor.decrypt(&message.payload); + // match payload { + // Ok(payload) => { + // decrypted_messages.push(PolledMessage { + // id: message.id, + // state: message.state, + // offset: message.offset, + // timestamp: message.timestamp, + // checksum: message.checksum, + // length: IggyByteSize::from(payload.len() as u64), + // payload: Bytes::from(payload), + // headers: message.headers.clone(), + // }); + // } + // Err(error) => { + // error!("Cannot decrypt the message. Error: {}", error); + // return Err(IggyError::CannotDecryptData); + // } + // } + // } + // polled_messages.messages = decrypted_messages; + // Ok(polled_messages) } pub async fn append_messages( diff --git a/server/src/streaming/systems/stats.rs b/server/src/streaming/systems/stats.rs index 5f2cb819..ba268b4c 100644 --- a/server/src/streaming/systems/stats.rs +++ b/server/src/streaming/systems/stats.rs @@ -3,9 +3,8 @@ use crate::versioning::SemanticVersion; use crate::VERSION; use iggy::error::IggyError; use iggy::locking::IggySharedMutFn; -use iggy::models::stats::{CacheMetricsKey, Stats}; +use iggy::models::stats::Stats; use iggy::utils::duration::IggyDuration; -use std::collections::HashMap; use std::sync::OnceLock; use sysinfo::{Pid, ProcessesToUpdate, System as SysinfoSystem}; use tokio::sync::Mutex; @@ -38,21 +37,6 @@ impl System { let kernel_version = sysinfo::System::kernel_version().unwrap_or("unknown_kernel_version".to_string()); - let mut cache_metrics = HashMap::new(); - for stream in self.streams.values() { - for topic in stream.topics.values() { - for partition in topic.partitions.values() { - let key = CacheMetricsKey { - stream_id: stream.stream_id, - topic_id: topic.topic_id, - partition_id: partition.read().await.partition_id, - }; - let metrics = partition.read().await.get_cache_metrics(); - cache_metrics.insert(key, metrics); - } - } - } - let mut stats = Stats { process_id, total_cpu_usage, @@ -67,7 +51,6 @@ impl System { iggy_server_semver: SemanticVersion::current() .ok() .and_then(|v| v.get_numeric_version().ok()), - cache_metrics, ..Default::default() }; diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 4c377788..8da9adaf 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -1,5 +1,5 @@ use crate::streaming::polling_consumer::PollingConsumer; -use crate::streaming::segments::IggyMessagesMut; +use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice}; use crate::streaming::topics::topic::Topic; use crate::streaming::topics::COMPONENT; use crate::streaming::utils::file::folder_size; @@ -31,7 +31,7 @@ impl Topic { partition_id: u32, strategy: PollingStrategy, count: u32, - ) -> Result<IggyMessages, IggyError> { + ) -> Result<IggyMessagesSlice, IggyError> { if !self.has_partitions() { return Err(IggyError::NoPartitions(self.topic_id, self.stream_id)); }
