This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch zero-copy-no-batching in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 5a248f09d8faefd310e1d634d1cd913ae8548fca Author: Hubert Gruszecki <[email protected]> AuthorDate: Sun Mar 23 22:42:15 2025 +0100 implement http (sadly with copy) --- integration/tests/streaming/get_by_offset.rs | 9 +- integration/tests/streaming/get_by_timestamp.rs | 10 +- integration/tests/streaming/mod.rs | 5 +- sdk/src/cli/message/send_messages.rs | 15 +- sdk/src/http/messages.rs | 18 +- sdk/src/messages/polled_messages.rs | 55 ++- sdk/src/messages/send_messages.rs | 103 +--- sdk/src/models/messaging/indexes.rs | 7 + sdk/src/models/messaging/message.rs | 524 ++++++++++++++++----- sdk/src/models/messaging/message_header_view.rs | 2 +- sdk/src/models/messaging/messages_batch.rs | 72 ++- .../handlers/messages/poll_messages_handler.rs | 4 +- server/src/http/messages.rs | 86 ++-- server/src/streaming/cache/buffer.rs | 177 ------- server/src/streaming/cache/memory_tracker.rs | 101 ---- server/src/streaming/cache/mod.rs | 3 - server/src/streaming/partitions/storage.rs | 15 +- .../src/streaming/segments/indexes/indexes_mut.rs | 2 +- server/src/streaming/segments/segment.rs | 17 +- .../streaming/segments/types/messages_batch_set.rs | 57 +++ server/src/streaming/topics/messages.rs | 15 +- 21 files changed, 665 insertions(+), 632 deletions(-) diff --git a/integration/tests/streaming/get_by_offset.rs b/integration/tests/streaming/get_by_offset.rs index 7a67e6d9..94f5a854 100644 --- a/integration/tests/streaming/get_by_offset.rs +++ b/integration/tests/streaming/get_by_offset.rs @@ -141,10 +141,11 @@ async fn test_get_messages_by_offset( ); let message = IggyMessage::builder() - .with_id(id) - .with_payload(payload) - .with_user_headers_map(headers) - .build(); + .id(id) + .payload(payload) + .headers(headers) + .build() + .expect("Failed to create message with valid payload and headers"); all_messages.push(message); } diff --git a/integration/tests/streaming/get_by_timestamp.rs b/integration/tests/streaming/get_by_timestamp.rs index d89e38ef..4210826a 100644 --- a/integration/tests/streaming/get_by_timestamp.rs +++ b/integration/tests/streaming/get_by_timestamp.rs @@ -143,10 +143,12 @@ async fn test_get_messages_by_timestamp( ); let message = IggyMessage::builder() - .with_id(id) - .with_payload(payload) - .with_user_headers_map(headers) - .build(); + .id(id) + .payload(payload) + .headers(headers) + .build() + .expect("Failed to create message with valid payload and headers"); + all_messages.push(message); } diff --git a/integration/tests/streaming/mod.rs b/integration/tests/streaming/mod.rs index 1799e9c3..3d4d1e48 100644 --- a/integration/tests/streaming/mod.rs +++ b/integration/tests/streaming/mod.rs @@ -28,7 +28,8 @@ fn create_messages() -> Vec<IggyMessage> { fn create_message(id: u128, payload: &str) -> IggyMessage { let payload = Bytes::from(payload.to_string()); IggyMessage::builder() - .with_id(id) - .with_payload(payload) + .id(id) + .payload(payload) .build() + .expect("Failed to create message with valid payload and headers") } diff --git a/sdk/src/cli/message/send_messages.rs b/sdk/src/cli/message/send_messages.rs index dba2bd63..e6f08140 100644 --- a/sdk/src/cli/message/send_messages.rs +++ b/sdk/src/cli/message/send_messages.rs @@ -122,16 +122,18 @@ impl CliCommand for SendMessagesCmd { messages } else { + let headers = self.get_headers(); match &self.messages { Some(messages) => messages .iter() .map(|s| { IggyMessage::builder() - .with_payload(Bytes::from(s.clone())) - .with_user_headers_map(self.get_headers().clone()) + .payload(Bytes::from(s.clone())) + .headers(headers.clone().unwrap_or_default()) .build() }) - .collect::<Vec<_>>(), + .collect::<Result<Vec<_>, _>>() + .with_context(|| "Failed to create messages from provided strings")?, None => { let input = self.read_message_from_stdin()?; @@ -139,11 +141,12 @@ impl CliCommand for SendMessagesCmd { .lines() .map(|m| { IggyMessage::builder() - .with_payload(String::from(m).into()) - .with_user_headers_map(self.get_headers().clone()) + .payload(m.to_owned()) + .headers(headers.clone().unwrap_or_default()) .build() }) - .collect() + .collect::<Result<Vec<_>, _>>() + .with_context(|| "Failed to create messages from stdin")? } } }; diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs index bb1bd2e1..bf42f604 100644 --- a/sdk/src/http/messages.rs +++ b/sdk/src/http/messages.rs @@ -5,7 +5,7 @@ use crate::http::client::HttpClient; use crate::http::HttpTransport; use crate::identifier::Identifier; use crate::messages::{Partitioning, PollingStrategy}; -use crate::prelude::{IggyMessage, PolledMessages}; +use crate::prelude::{FlushUnsavedBuffer, IggyMessage, PollMessages, PolledMessages, SendMessages}; use async_trait::async_trait; #[async_trait] @@ -20,7 +20,6 @@ impl MessageClient for HttpClient { count: u32, auto_commit: bool, ) -> Result<PolledMessages, IggyError> { - /* let response = self .get_with_query( &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), @@ -40,8 +39,6 @@ impl MessageClient for HttpClient { .await .map_err(|_| IggyError::InvalidJsonResponse)?; Ok(messages) - */ - todo!() } async fn send_messages( @@ -51,20 +48,12 @@ impl MessageClient for HttpClient { partitioning: &Partitioning, messages: &mut [IggyMessage], ) -> Result<(), IggyError> { - /* self.post( &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()), - &SendMessages { - stream_id: stream_id.clone(), - topic_id: topic_id.clone(), - partitioning: partitioning.clone(), - messages: messages.to_vec(), - }, + &SendMessages::as_bytes(stream_id, topic_id, partitioning, messages), ) .await?; Ok(()) - */ - todo!() } async fn flush_unsaved_buffer( @@ -74,7 +63,6 @@ impl MessageClient for HttpClient { partition_id: u32, fsync: bool, ) -> Result<(), IggyError> { - /* let _ = self .get_with_query( &get_path_flush_unsaved_buffer( @@ -92,8 +80,6 @@ impl MessageClient for HttpClient { ) .await?; Ok(()) - */ - todo!() } } diff --git a/sdk/src/messages/polled_messages.rs b/sdk/src/messages/polled_messages.rs index 4d7fda47..3f4e929d 100644 --- a/sdk/src/messages/polled_messages.rs +++ b/sdk/src/messages/polled_messages.rs @@ -1,9 +1,10 @@ use crate::{ error::IggyError, - prelude::{BytesSerializable, IggyMessage}, + prelude::{BytesSerializable, IggyMessage, IggyMessageHeader, IGGY_MESSAGE_HEADER_SIZE}, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use serde::{Deserialize, Serialize}; +use tracing::error; /// The wrapper on top of the collection of messages that are polled from the partition. /// It consists of the following fields: @@ -58,7 +59,9 @@ impl BytesSerializable for PolledMessages { .try_into() .map_err(|_| IggyError::InvalidNumberEncoding)?, ); - let messages = IggyMessage::from_raw_bytes(bytes.slice(16..), count)?; + + let messages = messages_vec(bytes.slice(16..), count)?; + Ok(Self { partition_id, current_offset, @@ -66,12 +69,48 @@ impl BytesSerializable for PolledMessages { messages, }) } +} - fn write_to_buffer(&self, buf: &mut BytesMut) { - todo!() - } +/// Convert Bytes to messages +fn messages_vec(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> { + let mut messages = Vec::with_capacity(count as usize); + let mut position = 0; + let buf_len = buffer.len(); - fn get_buffer_size(&self) -> u32 { - unimplemented!(); + while position < buf_len { + if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { + break; + } + let header_bytes = buffer.slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); + let header = match IggyMessageHeader::from_bytes(header_bytes) { + Ok(h) => h, + Err(e) => { + error!("Failed to deserialize message header: {}", e); + return Err(e); + } + }; + position += IGGY_MESSAGE_HEADER_SIZE as usize; + + let payload_end = position + header.payload_length as usize; + if payload_end > buf_len { + break; + } + let payload = buffer.slice(position..payload_end); + position = payload_end; + + let headers: Option<Bytes> = if header.user_headers_length > 0 { + Some(buffer.slice(position..position + header.user_headers_length as usize)) + } else { + None + }; + position += header.user_headers_length as usize; + + messages.push(IggyMessage { + header, + payload, + user_headers: headers, + }); } + + Ok(messages) } diff --git a/sdk/src/messages/send_messages.rs b/sdk/src/messages/send_messages.rs index 6c4b92da..8ceee3b5 100644 --- a/sdk/src/messages/send_messages.rs +++ b/sdk/src/messages/send_messages.rs @@ -133,16 +133,6 @@ impl Validatable<IggyError> for SendMessages { for message in IggyMessageViewIterator::new(&self.batch) { message_count += 1; - // TODO(hubcio): IMHO validation of headers should be purely on SDK side - // if let Some(headers) = message.headers() { - // for value in headers.values() { - // headers_size += value.value.len() as u32; - // if headers_size > MAX_HEADERS_SIZE { - // return Err(IggyError::TooBigHeadersPayload); - // } - // } - // } - let message_payload = message.payload(); payload_size += message_payload.len() as u32; if payload_size > MAX_PAYLOAD_SIZE { @@ -171,7 +161,7 @@ impl BytesSerializable for SendMessages { panic!("should not be used") } - fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> { + fn from_bytes(_bytes: Bytes) -> Result<SendMessages, IggyError> { panic!("should not be used") } } @@ -190,99 +180,8 @@ impl Display for SendMessages { #[cfg(test)] mod tests { - use std::str::FromStr; - use super::*; - #[test] - fn should_be_serialized_as_bytes() { - let message_1 = IggyMessage::from_str("hello 1").unwrap(); - let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None); - let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None); - let messages = vec![message_1, message_2, message_3]; - let command = SendMessages { - stream_id: Identifier::numeric(1).unwrap(), - topic_id: Identifier::numeric(2).unwrap(), - partitioning: Partitioning::partition_id(4), - messages, - }; - - let bytes = command.to_bytes(); - - let mut position = 0; - let stream_id = Identifier::from_bytes(bytes.clone()).unwrap(); - position += stream_id.get_size_bytes().as_bytes_usize(); - let topic_id = Identifier::from_bytes(bytes.slice(position..)).unwrap(); - position += topic_id.get_size_bytes().as_bytes_usize(); - let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap(); - position += key.get_size_bytes().as_bytes_usize(); - let messages = bytes.slice(position..); - let command_messages = command - .messages - .iter() - .fold(BytesMut::new(), |mut bytes_mut, message| { - bytes_mut.put(message.to_bytes()); - bytes_mut - }) - .freeze(); - - assert!(!bytes.is_empty()); - assert_eq!(stream_id, command.stream_id); - assert_eq!(topic_id, command.topic_id); - assert_eq!(key, command.partitioning); - assert_eq!(messages, command_messages); - } - - #[test] - fn should_be_deserialized_from_bytes() { - let stream_id = Identifier::numeric(1).unwrap(); - let topic_id = Identifier::numeric(2).unwrap(); - let key = Partitioning::partition_id(4); - - let message_1 = IggyMessage::from_str("hello 1").unwrap(); - let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None); - let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None); - let messages = [ - message_1.to_bytes(), - message_2.to_bytes(), - message_3.to_bytes(), - ] - .concat(); - - let key_bytes = key.to_bytes(); - let stream_id_bytes = stream_id.to_bytes(); - let topic_id_bytes = topic_id.to_bytes(); - let current_position = stream_id_bytes.len() + topic_id_bytes.len() + key_bytes.len(); - let mut bytes = BytesMut::with_capacity(current_position); - bytes.put_slice(&stream_id_bytes); - bytes.put_slice(&topic_id_bytes); - bytes.put_slice(&key_bytes); - bytes.put_slice(&messages); - let bytes = bytes.freeze(); - let command = SendMessages::from_bytes(bytes.clone()); - assert!(command.is_ok()); - - let messages_payloads = bytes.slice(current_position..); - let mut position = 0; - let mut messages = Vec::new(); - while position < messages_payloads.len() { - let message = IggyMessage::from_bytes(messages_payloads.slice(position..)).unwrap(); - position += message.get_size_bytes().as_bytes_usize(); - messages.push(message); - } - - let command = command.unwrap(); - assert_eq!(command.stream_id, stream_id); - assert_eq!(command.topic_id, topic_id); - assert_eq!(command.partitioning, key); - for (index, message) in command.messages.iter().enumerate() { - let command_message = &command.messages[index]; - assert_eq!(command_message.id, message.id); - assert_eq!(command_message.length, message.length); - assert_eq!(command_message.payload, message.payload); - } - } - #[test] fn key_of_type_balanced_should_have_empty_value() { let key = Partitioning::balanced(); diff --git a/sdk/src/models/messaging/indexes.rs b/sdk/src/models/messaging/indexes.rs index 56c63055..95eda691 100644 --- a/sdk/src/models/messaging/indexes.rs +++ b/sdk/src/models/messaging/indexes.rs @@ -20,6 +20,7 @@ impl IggyIndexes { } } + /// Creates a new empty container pub fn empty() -> Self { Self { buffer: Bytes::new(), @@ -137,6 +138,7 @@ impl IggyIndexes { result } + /// Gets the base position of the container pub fn base_position(&self) -> u32 { self.base_position } @@ -147,6 +149,11 @@ impl IggyIndexes { .map(|idx| idx.position()) .unwrap_or(0) } + + /// Decompose the container into its components + pub fn decompose(self) -> (u32, Bytes) { + (self.base_position, self.buffer) + } } impl StdIndex<usize> for IggyIndexes { diff --git a/sdk/src/models/messaging/message.rs b/sdk/src/models/messaging/message.rs index ce12c14e..789a9af9 100644 --- a/sdk/src/models/messaging/message.rs +++ b/sdk/src/models/messaging/message.rs @@ -11,114 +11,262 @@ use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; use serde_with::serde_as; use std::collections::HashMap; +use std::convert::TryFrom; use std::str::FromStr; -use tracing::error; - -/// The single message. It is exact format in which message is saved to / retrieved from the disk. +use tracing::warn; + +/// A message stored in the Iggy messaging system. +/// +/// `IggyMessage` represents a single message that can be sent to or received from +/// a stream. Each message consists of: +/// * A header with message metadata +/// * A payload (the actual content) +/// * Optional user-defined headers for additional context +/// +/// # Examples +/// +/// ``` +/// // Create a simple text message +/// let message = IggyMessage::create("Hello world!"); +/// +/// // Create a message with custom ID +/// let message = IggyMessage::with_id(42, "Custom message".into()); +/// +/// // Create a message with headers +/// let mut headers = HashMap::new(); +/// headers.insert(HeaderKey::new("content-type")?, HeaderValue::from_str("text/plain")?); +/// let message = IggyMessage::with_headers("Message with metadata".into(), headers); +/// ``` #[serde_as] -#[derive(Default, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct IggyMessage { - /// Message header + /// Message metadata pub header: IggyMessageHeader, - /// Message payload + + /// Message content #[serde_as(as = "Base64")] pub payload: Bytes, + + /// Optional user-defined headers pub user_headers: Option<Bytes>, } impl IggyMessage { - /// Create a message with payload - pub fn new(payload: Bytes) -> Self { - Self::builder().with_payload(payload).build() + /// Creates a new message with the given payload. + /// + /// This is the simplest way to create a message when you don't need + /// custom IDs or headers. + /// + /// # Arguments + /// + /// * `payload` - The message content + /// + /// # Returns + /// + /// A new `IggyMessage` with the provided payload + /// + /// # Examples + /// + /// ``` + /// let message = IggyMessage::create("Hello world!"); + /// ``` + pub fn create<T: Into<Bytes>>(payload: T) -> Self { + Self::builder() + .payload(payload) + .build() + .expect("Failed to create message with valid payload") + } + + /// Creates a new message with a specific ID and payload. + /// + /// Use this when you need to control the message ID. + /// + /// # Arguments + /// + /// * `id` - Custom message ID + /// * `payload` - The message content + /// + /// # Returns + /// + /// A new `IggyMessage` with the provided ID and payload + /// + /// # Examples + /// + /// ``` + /// let message = IggyMessage::with_id(42, "My message".into()); + /// ``` + pub fn with_id<T: Into<Bytes>>(id: u128, payload: T) -> Self { + Self::builder() + .id(id) + .payload(payload) + .build() + .expect("Failed to create message with valid payload") } - /// Create a message with ID and payload - pub fn with_id(id: u128, payload: Bytes) -> Self { - Self::builder().with_id(id).with_payload(payload).build() + /// Creates a new message with payload and user-defined headers. + /// + /// # Arguments + /// + /// * `payload` - The message content + /// * `headers` - Key-value headers to attach to the message + /// + /// # Returns + /// + /// A new `IggyMessage` with the provided payload and headers + /// + /// # Examples + /// + /// ``` + /// let mut headers = HashMap::new(); + /// headers.insert(HeaderKey::new("content-type")?, HeaderValue::from_str("text/plain")?); + /// + /// let message = IggyMessage::with_headers("My message".into(), headers); + /// ``` + pub fn with_headers<T: Into<Bytes>>( + payload: T, + headers: HashMap<HeaderKey, HeaderValue>, + ) -> Self { + Self::builder() + .payload(payload) + .headers(headers) + .build() + .expect("Failed to create message with valid payload and headers") } - /// Create a message with ID, payload and user headers - pub fn with_id_and_headers( + /// Creates a new message with a specific ID, payload, and user-defined headers. + /// + /// This is the most flexible way to create a message with full control. + /// + /// # Arguments + /// + /// * `id` - Custom message ID + /// * `payload` - The message content + /// * `headers` - Key-value headers to attach to the message + /// + /// # Returns + /// + /// A new `IggyMessage` with all provided parameters + /// + /// # Examples + /// + /// ``` + /// let mut headers = HashMap::new(); + /// headers.insert(HeaderKey::new("content-type")?, HeaderValue::from_str("text/plain")?); + /// + /// let message = IggyMessage::with_id_and_headers(42, "My message".into(), headers); + /// ``` + pub fn with_id_and_headers<T: Into<Bytes>>( id: u128, - payload: Bytes, + payload: T, headers: HashMap<HeaderKey, HeaderValue>, ) -> Self { Self::builder() - .with_id(id) - .with_payload(payload) - .with_user_headers_map(headers) + .id(id) + .payload(payload) + .headers(headers) .build() + .expect("Failed to create message with valid payload and headers") } - /// Start a builder for more complex configuration + /// Creates a message builder for advanced configuration. + /// + /// Use the builder when you need fine-grained control over message creation. + /// + /// # Returns + /// + /// A new `IggyMessageBuilder` instance + /// + /// # Examples + /// + /// ``` + /// let message = IggyMessage::builder() + /// .id(123) + /// .payload("Hello") + /// .header("content-type", "text/plain") + /// .build()?; + /// ``` pub fn builder() -> IggyMessageBuilder { IggyMessageBuilder::new() } - /// Return instantiated user headers map + /// Gets the user headers as a typed HashMap. + /// + /// This method parses the binary header data into a typed HashMap for easy access. + /// + /// # Returns + /// + /// * `Ok(Some(HashMap))` - Successfully parsed headers + /// * `Ok(None)` - No headers present + /// * `Err(IggyError)` - Error parsing headers pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, HeaderValue>>, IggyError> { - if let Some(headers) = &self.user_headers { - let headers_bytes = Bytes::copy_from_slice(headers); - - match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) { - Ok(h) => Ok(Some(h)), - Err(e) => { - tracing::error!( - "Error parsing headers: {}, header_length={}", - e, - self.header.user_headers_length - ); - - Ok(None) + match &self.user_headers { + Some(headers) => { + let headers_bytes = Bytes::copy_from_slice(headers); + match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) { + Ok(h) => Ok(Some(h)), + Err(e) => { + warn!( + "Failed to deserialize user headers: {e}, user_headers_length: {}, skipping field...", + self.header.user_headers_length + ); + Ok(None) + } } } - } else { - Ok(None) + None => Ok(None), } } - /// Convert Bytes to messages - pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, IggyError> { - let mut messages = Vec::with_capacity(count as usize); - let mut position = 0; - let buf_len = buffer.len(); + /// Retrieves a specific user header value by key. + /// + /// This is a convenience method to get a specific header without handling the full map. + /// + /// # Arguments + /// + /// * `key` - The header key to look up + /// + /// # Returns + /// + /// * `Ok(Some(HeaderValue))` - Header found with its value + /// * `Ok(None)` - Header not found or headers couldn't be parsed + /// * `Err(IggyError)` - Error accessing headers + pub fn get_header(&self, key: &HeaderKey) -> Result<Option<HeaderValue>, IggyError> { + Ok(self + .user_headers_map()? + .and_then(|map| map.get(key).cloned())) + } - while position < buf_len { - if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len { - break; - } - let header_bytes = buffer.slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize); - let header = match IggyMessageHeader::from_bytes(header_bytes) { - Ok(h) => h, - Err(e) => { - error!("Failed to parse message header: {}", e); - return Err(e); - } - }; - position += IGGY_MESSAGE_HEADER_SIZE as usize; + /// Checks if this message contains a specific header key. + /// + /// # Arguments + /// + /// * `key` - The header key to check for + /// + /// # Returns + /// + /// * `Ok(true)` - Header exists + /// * `Ok(false)` - Header doesn't exist or headers couldn't be parsed + /// * `Err(IggyError)` - Error accessing headers + pub fn has_header(&self, key: &HeaderKey) -> Result<bool, IggyError> { + Ok(self + .user_headers_map()? + .is_some_and(|map| map.contains_key(key))) + } - let payload_end = position + header.payload_length as usize; - if payload_end > buf_len { - break; - } - let payload = buffer.slice(position..payload_end); - position = payload_end; - - let headers: Option<Bytes> = if header.user_headers_length > 0 { - Some(buffer.slice(position..position + header.user_headers_length as usize)) - } else { - None - }; - position += header.user_headers_length as usize; - - messages.push(IggyMessage { - header, - payload, - user_headers: headers, - }); - } + /// Gets the payload as a UTF-8 string, if valid. + /// + /// # Returns + /// + /// * `Ok(String)` - Successfully converted payload to string + /// * `Err(IggyError)` - Payload is not valid UTF-8 + pub fn payload_as_string(&self) -> Result<String, IggyError> { + String::from_utf8(self.payload.to_vec()).map_err(|_| IggyError::InvalidUtf8) + } +} - Ok(messages) +impl Default for IggyMessage { + fn default() -> Self { + Self::create("hello world") } } @@ -126,45 +274,45 @@ impl FromStr for IggyMessage { type Err = IggyError; fn from_str(s: &str) -> Result<Self, Self::Err> { - let payload = Bytes::from(s.as_bytes().to_vec()); - Ok(IggyMessage::new(payload)) + Ok(Self::create(Bytes::from(s.to_owned()))) } } impl std::fmt::Display for IggyMessage { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let len = self.payload.len(); - - if len > 40 { - write!( - f, - "{}|{}...{}", - self.header.id, - String::from_utf8_lossy(&self.payload[..20]), - String::from_utf8_lossy(&self.payload[len - 20..]) - ) - } else { - write!( - f, - "{}|{}", - self.header.id, - String::from_utf8_lossy(&self.payload) - ) + match String::from_utf8(self.payload.to_vec()) { + Ok(payload) => { + let preview = if payload.len() > 50 { + format!("{}... ({}B)", &payload[..47], self.payload.len()) + } else { + payload + }; + write!( + f, + "[{}] ID:{} '{}'", + self.header.offset, self.header.id, preview + ) + } + Err(_) => { + write!( + f, + "[{}] ID:{} <binary {}B>", + self.header.offset, + self.header.id, + self.payload.len() + ) + } } } } impl Sizeable for IggyMessage { fn get_size_bytes(&self) -> IggyByteSize { - let payload_len = IggyByteSize::from(self.payload.len() as u64); - let headers_len = if let Some(headers) = &self.user_headers { - IggyByteSize::from(headers.len() as u64) - } else { - IggyByteSize::from(0) - }; - let message_header_len = IggyByteSize::from(IGGY_MESSAGE_HEADER_SIZE as u64); + let payload_len = self.payload.len() as u64; + let headers_len = self.user_headers.as_ref().map_or(0, |h| h.len() as u64); + let message_header_len = IGGY_MESSAGE_HEADER_SIZE as u64; - payload_len + headers_len + message_header_len + IggyByteSize::from(payload_len + headers_len + message_header_len) } } @@ -184,19 +332,27 @@ impl BytesSerializable for IggyMessage { if bytes.len() < IGGY_MESSAGE_HEADER_SIZE as usize { return Err(IggyError::InvalidCommand); } + let mut position = 0; let header = IggyMessageHeader::from_bytes(bytes.slice(0..IGGY_MESSAGE_HEADER_SIZE as usize))?; position += IGGY_MESSAGE_HEADER_SIZE as usize; - let payload = bytes.slice(position..position + header.payload_length as usize); - if payload.len() != header.payload_length as usize { + let payload_end = position + header.payload_length as usize; + + if payload_end > bytes.len() { return Err(IggyError::InvalidMessagePayloadLength); } - position += header.payload_length as usize; + let payload = bytes.slice(position..payload_end); + position = payload_end; + let user_headers = if header.user_headers_length > 0 { - Some(bytes.slice(position..position + header.user_headers_length as usize)) + let headers_end = position + header.user_headers_length as usize; + if headers_end > bytes.len() { + return Err(IggyError::InvalidHeaderValue); + } + Some(bytes.slice(position..headers_end)) } else { None }; @@ -208,7 +364,6 @@ impl BytesSerializable for IggyMessage { }) } - /// Write message to bytes mut fn write_to_buffer(&self, buf: &mut BytesMut) { buf.put_slice(&self.header.to_bytes()); buf.put_slice(&self.payload); @@ -218,6 +373,10 @@ impl BytesSerializable for IggyMessage { } } +/// Builder for creating `IggyMessage` instances with flexible configuration. +/// +/// The builder pattern allows for clear, step-by-step construction of complex +/// message configurations, with better error handling than chained constructors. #[derive(Debug, Default)] pub struct IggyMessageBuilder { id: Option<u128>, @@ -226,39 +385,64 @@ pub struct IggyMessageBuilder { } impl IggyMessageBuilder { + /// Creates a new empty message builder. pub fn new() -> Self { - Self { - id: None, - payload: None, - headers: None, - } + Self::default() } - pub fn with_id(mut self, id: u128) -> Self { + /// Sets the message ID. + /// + /// If not specified, a default ID (0) will be used. + pub fn id(mut self, id: u128) -> Self { self.id = Some(id); self } - pub fn with_payload(mut self, payload: Bytes) -> Self { - self.payload = Some(payload); + /// Sets the message payload. + /// + /// This method accepts any type that can be converted into `Bytes`, + /// including strings, byte slices, and vectors. + pub fn payload<T: Into<Bytes>>(mut self, payload: T) -> Self { + self.payload = Some(payload.into()); self } - pub fn with_user_key_value_header(mut self, key: HeaderKey, value: HeaderValue) -> Self { + /// Adds a single header key-value pair to the message. + /// + /// Multiple calls will add multiple headers. + pub fn header(mut self, key: HeaderKey, value: HeaderValue) -> Self { let headers = self.headers.get_or_insert_with(HashMap::new); headers.insert(key, value); self } - pub fn with_user_headers_map( - mut self, - headers: impl Into<Option<HashMap<HeaderKey, HeaderValue>>>, - ) -> Self { - self.headers = headers.into(); + /// Adds a string header with the given key and value. + /// + /// This is a convenience method for adding string headers without + /// manually creating HeaderValue objects. + pub fn string_header(mut self, key: &str, value: &str) -> Result<Self, IggyError> { + let key = HeaderKey::new(key)?; + let value = HeaderValue::from_str(value)?; + let headers = self.headers.get_or_insert_with(HashMap::new); + headers.insert(key, value); + Ok(self) + } + + /// Sets all headers at once from a HashMap. + /// + /// This replaces any headers previously added. + pub fn headers(mut self, headers: HashMap<HeaderKey, HeaderValue>) -> Self { + self.headers = Some(headers); self } - pub fn build(self) -> IggyMessage { + /// Builds the final IggyMessage from the configured parameters. + /// + /// # Returns + /// + /// * `Ok(IggyMessage)` - Successfully built message + /// * `Err(IggyError)` - Error during message construction + pub fn build(self) -> Result<IggyMessage, IggyError> { let payload = self.payload.unwrap_or_default(); let id = self.id.unwrap_or(0); let headers_length = get_headers_size_bytes(&self.headers).as_bytes_u64() as u32; @@ -275,28 +459,116 @@ impl IggyMessageBuilder { let user_headers = self.headers.map(|headers| headers.to_bytes()); - IggyMessage { + Ok(IggyMessage { header, payload, user_headers, - } + }) + } +} + +// Clean implementations of conversion traits + +impl From<IggyMessage> for Bytes { + fn from(message: IggyMessage) -> Self { + message.to_bytes() } } impl From<String> for IggyMessage { fn from(s: String) -> Self { - Self::new(Bytes::from(s)) + Self::create(s) } } impl From<&str> for IggyMessage { fn from(s: &str) -> Self { - Self::new(Bytes::from(s.to_owned())) + Self::create(Bytes::from(s.to_owned())) } } impl From<Vec<u8>> for IggyMessage { fn from(v: Vec<u8>) -> Self { - Self::new(Bytes::from(v)) + Self::create(v) + } +} + +impl From<&[u8]> for IggyMessage { + fn from(bytes: &[u8]) -> Self { + Self::create(Bytes::copy_from_slice(bytes)) + } +} + +impl TryFrom<Bytes> for IggyMessage { + type Error = IggyError; + + fn try_from(bytes: Bytes) -> Result<Self, Self::Error> { + Self::from_bytes(bytes) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_create_simple_message() { + let message = IggyMessage::create("test message"); + assert_eq!(message.payload, Bytes::from("test message")); + assert_eq!(message.header.id, 0); + assert!(message.user_headers.is_none()); + } + + #[test] + fn test_create_with_id() { + let message = IggyMessage::with_id(42, "test with id"); + assert_eq!(message.payload, Bytes::from("test with id")); + assert_eq!(message.header.id, 42); + } + + #[test] + fn test_create_with_headers() { + let mut headers = HashMap::new(); + headers.insert( + HeaderKey::new("content-type").unwrap(), + HeaderValue::from_str("text/plain").unwrap(), + ); + + let message = IggyMessage::with_headers("test with headers", headers); + assert_eq!(message.payload, Bytes::from("test with headers")); + assert!(message.user_headers.is_some()); + + let headers_map = message.user_headers_map().unwrap().unwrap(); + assert_eq!(headers_map.len(), 1); + assert!(headers_map.contains_key(&HeaderKey::new("content-type").unwrap())); + } + + #[test] + fn test_empty_payload() { + let message = IggyMessage::create(Bytes::new()); + assert_eq!(message.payload.len(), 0); + assert_eq!(message.header.payload_length, 0); + } + + #[test] + fn test_from_string() { + let message: IggyMessage = "simple message".into(); + assert_eq!(message.payload, Bytes::from("simple message")); + } + + #[test] + fn test_payload_as_string() { + let message = IggyMessage::create("test message"); + assert_eq!(message.payload_as_string().unwrap(), "test message"); + } + + #[test] + fn test_serialization_roundtrip() { + let original = IggyMessage::with_id(123, "serialization test"); + let bytes = original.to_bytes(); + let decoded = IggyMessage::from_bytes(bytes).unwrap(); + + assert_eq!(original.header.id, decoded.header.id); + assert_eq!(original.payload, decoded.payload); } } diff --git a/sdk/src/models/messaging/message_header_view.rs b/sdk/src/models/messaging/message_header_view.rs index 7c279d1a..1c35eb60 100644 --- a/sdk/src/models/messaging/message_header_view.rs +++ b/sdk/src/models/messaging/message_header_view.rs @@ -85,7 +85,7 @@ impl BytesSerializable for IggyMessageHeaderView<'_> { panic!("should not be used") } - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { + fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> { panic!("should not be used") } diff --git a/sdk/src/models/messaging/messages_batch.rs b/sdk/src/models/messaging/messages_batch.rs index e8c044a4..7bddf2cd 100644 --- a/sdk/src/models/messaging/messages_batch.rs +++ b/sdk/src/models/messaging/messages_batch.rs @@ -22,7 +22,7 @@ pub struct IggyMessagesBatch { } impl IggyMessagesBatch { - /// Create a new messages container from a buffer + /// Create a batch from indexes buffer and messages buffer pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self { #[cfg(debug_assertions)] { @@ -57,7 +57,7 @@ impl IggyMessagesBatch { } } - /// Creates a empty messages container + /// Creates a empty messages batch pub fn empty() -> Self { Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0) } @@ -104,8 +104,8 @@ impl IggyMessagesBatch { } /// Decompose the batch into its components - pub fn decompose(self) -> (IggyIndexes, Bytes, u32) { - (self.indexes, self.messages, self.count) + pub fn decompose(self) -> (u32, IggyIndexes, Bytes) { + (self.count, self.indexes, self.messages) } /// Get index of first message @@ -140,16 +140,6 @@ impl IggyMessagesBatch { .unwrap_or(0) } - /// Helper method to read a base position (u32) from the byte array at the given index - fn base_position_at(&self, position_index: u32) -> u32 { - tracing::error!("base_position = {}", self.indexes.base_position()); - if let Some(index) = self.indexes.get(position_index) { - index.position() - self.indexes.base_position() - } else { - 0 - } - } - /// Helper method to read a position (u32) from the byte array at the given index fn position_at(&self, position_index: u32) -> u32 { if let Some(index) = self.indexes.get(position_index) { @@ -306,6 +296,60 @@ impl IggyMessagesBatch { &self.messages[start_position..end_position], )) } + + /// Creates a batch from a vector of messages. Note that this implementation is: + /// - Inefficient with multiple buffer allocations and copies + /// - Serializes each message individually rather than in batches + /// - Designed for testing purposes only, not optimized for production + /// - May cause performance issues with large message collections + /// + /// This should be used only for testing purposes! + /// TODO(hubcio): maybe it can be removed + #[cfg(test)] + pub fn from_messages_vec(messages: &[crate::prelude::IggyMessage]) -> Self { + use crate::prelude::BytesSerializable; + use bytes::{BufMut, BytesMut}; + + if messages.is_empty() { + return Self::empty(); + } + + let messages_count = messages.len() as u32; + let mut total_size = 0; + for msg in messages.iter() { + total_size += msg.get_size_bytes().as_bytes_usize(); + } + + let mut messages_buffer = BytesMut::with_capacity(total_size); + let mut indexes_buffer = BytesMut::new(); + let mut current_position = 0; + + for (i, message) in messages.iter().enumerate() { + message.write_to_buffer(&mut messages_buffer); + + let msg_size = message.get_size_bytes().as_bytes_u32(); + current_position += msg_size; + + let offset = i as u32; + + indexes_buffer.put_u32_le(offset); + indexes_buffer.put_u32_le(current_position); + indexes_buffer.put_u64_le(0); // timestamps from indexes are not used + } + + let indexes = crate::models::messaging::IggyIndexes::new(indexes_buffer.freeze(), 0); + + Self { + count: messages_count, + indexes, + messages: messages_buffer.freeze(), + } + } + + #[cfg(test)] + pub fn get_indexes(&self) -> &crate::models::messaging::IggyIndexes { + &self.indexes + } } impl Index<usize> for IggyMessagesBatch { diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs b/server/src/binary/handlers/messages/poll_messages_handler.rs index 79b43a79..95742586 100644 --- a/server/src/binary/handlers/messages/poll_messages_handler.rs +++ b/server/src/binary/handlers/messages/poll_messages_handler.rs @@ -13,8 +13,8 @@ use tracing::debug; #[derive(Debug)] pub struct IggyPollMetadata { - partition_id: u32, - current_offset: u64, + pub partition_id: u32, + pub current_offset: u64, } impl IggyPollMetadata { diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs index 7d1a3031..70e4a84e 100644 --- a/server/src/http/messages.rs +++ b/server/src/http/messages.rs @@ -2,6 +2,7 @@ use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; use crate::http::COMPONENT; +use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut}; use crate::streaming::session::Session; use crate::streaming::systems::messages::PollingArgs; use crate::streaming::utils::random_id; @@ -9,9 +10,11 @@ use axum::extract::{Path, Query, State}; use axum::http::StatusCode; use axum::routing::get; use axum::{Extension, Json, Router}; +use bytes::BytesMut; use error_set::ErrContext; use iggy::consumer::Consumer; use iggy::identifier::Identifier; +use iggy::models::messaging::IggyMessagesBatch; use iggy::prelude::*; use iggy::validatable::Validatable; use std::sync::Arc; @@ -35,31 +38,31 @@ async fn poll_messages( Extension(identity): Extension<Identity>, Path((stream_id, topic_id)): Path<(String, String)>, mut query: Query<PollMessages>, -) -> Result<Json<Vec<IggyMessage>>, CustomError> { - // query.stream_id = Identifier::from_str_value(&stream_id)?; - // query.topic_id = Identifier::from_str_value(&topic_id)?; - // query.validate()?; +) -> Result<Json<PolledMessages>, CustomError> { + query.stream_id = Identifier::from_str_value(&stream_id)?; + query.topic_id = Identifier::from_str_value(&topic_id)?; + query.validate()?; - // let consumer = Consumer::new(query.0.consumer.id); - // let system = state.system.read().await; - // let polled_messages = system - // .poll_messages( - // &Session::stateless(identity.user_id, identity.ip_address), - // &consumer, - // &query.0.stream_id, - // &query.0.topic_id, - // query.0.partition_id, - // PollingArgs::new(query.0.strategy, query.0.count, query.0.auto_commit), - // ) - // .await - // .with_error_context(|error| { - // format!( - // "{COMPONENT} (error: {error}) - failed to poll messages, stream ID: {}, topic ID: {}, partition ID: {:?}", - // stream_id, topic_id, query.0.partition_id - // ) - // })?; - // Ok(Json(polled_messages)) - todo!() + let consumer = Consumer::new(query.0.consumer.id); + let system = state.system.read().await; + let (metadata, messages) = system + .poll_messages( + &Session::stateless(identity.user_id, identity.ip_address), + &consumer, + &query.0.stream_id, + &query.0.topic_id, + query.0.partition_id, + PollingArgs::new(query.0.strategy, query.0.count, query.0.auto_commit), + ) + .await + .with_error_context(|error| { + format!( + "{COMPONENT} (error: {error}) - failed to poll messages, stream ID: {}, topic ID: {}, partition ID: {:?}", + stream_id, topic_id, query.0.partition_id + ) + })?; + let polled_messages = messages.into_polled_messages(metadata); + Ok(Json(polled_messages)) } async fn send_messages( @@ -68,31 +71,30 @@ async fn send_messages( Path((stream_id, topic_id)): Path<(String, String)>, Json(mut command): Json<SendMessages>, ) -> Result<StatusCode, CustomError> { - //TODO: Fix me - /* command.stream_id = Identifier::from_str_value(&stream_id)?; command.topic_id = Identifier::from_str_value(&topic_id)?; command.partitioning.length = command.partitioning.value.len() as u8; - command.messages.iter_mut().for_each(|msg| { - if msg.id == 0 { - msg.id = random_id::get_uuid(); - } - }); + // TODO(hubcio): Add message ID generation? + // command.messages.iter_mut().for_each(|msg| { + // if msg.id == 0 { + // msg.id = random_id::get_uuid(); + // } + // }); command.validate()?; - let messages = command.messages; + let batch = make_mutable(command.batch); let command_stream_id = command.stream_id; let command_topic_id = command.topic_id; let partitioning = command.partitioning; let system = state.system.read().await; - // TODO(haze): Add confirmation level after testing is complete + system .append_messages( &Session::stateless(identity.user_id, identity.ip_address), - command_stream_id, - command_topic_id, - partitioning, - messages, + &command_stream_id, + &command_topic_id, + &partitioning, + batch, None, ) .await @@ -103,8 +105,6 @@ async fn send_messages( ) })?; Ok(StatusCode::CREATED) - */ - todo!() } #[instrument(skip_all, name = "trace_flush_unsaved_buffer", fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, iggy_topic_id = topic_id, iggy_partition_id = partition_id, iggy_fsync = fsync))] @@ -127,3 +127,11 @@ async fn flush_unsaved_buffer( .await?; Ok(StatusCode::OK) } + +fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut { + let (_, indexes, messages) = batch.decompose(); + let (_, indexes_buffer) = indexes.decompose(); + let indexes_buffer_mut = BytesMut::from(indexes_buffer); + let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut); + IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, messages.into()) +} diff --git a/server/src/streaming/cache/buffer.rs b/server/src/streaming/cache/buffer.rs deleted file mode 100644 index 67ebba60..00000000 --- a/server/src/streaming/cache/buffer.rs +++ /dev/null @@ -1,177 +0,0 @@ -use crate::streaming::local_sizeable::RealSize; - -use super::memory_tracker::CacheMemoryTracker; -use atone::Vc; -use iggy::utils::byte_size::IggyByteSize; -use std::fmt::Debug; -use std::ops::Index; -use std::sync::{ - atomic::{AtomicU64, Ordering}, - Arc, -}; - -#[derive(Debug)] -pub struct SmartCache<T: RealSize + Debug> { - buffer: Vc<T>, - memory_tracker: Arc<CacheMemoryTracker>, - current_size: IggyByteSize, - hits: AtomicU64, - misses: AtomicU64, -} - -impl<T> SmartCache<T> -where - T: RealSize + Clone + Debug, -{ - pub fn new() -> Self { - let current_size = IggyByteSize::default(); - let buffer = Vc::new(); - let memory_tracker = CacheMemoryTracker::get_instance().unwrap(); - - Self { - buffer, - memory_tracker, - current_size, - hits: AtomicU64::new(0), - misses: AtomicU64::new(0), - } - } - - // Used only for cache validation tests - #[cfg(test)] - pub fn to_vec(&self) -> Vec<T> { - let mut vec = Vec::with_capacity(self.buffer.len()); - vec.extend(self.buffer.iter().cloned()); - vec - } - - /// Pushes an element to the buffer, and if adding the element would exceed the memory limit, - /// removes the oldest elements until there's enough space for the new element. - /// It's preferred to use `extend` instead of this method. - pub fn push_safe(&mut self, element: T) { - let element_size = element.real_size(); - - while !self.memory_tracker.will_fit_into_cache(element_size) { - if let Some(oldest_element) = self.buffer.pop_front() { - let oldest_size = oldest_element.real_size(); - self.memory_tracker - .decrement_used_memory(oldest_size.as_bytes_u64()); - self.current_size -= oldest_size; - } - } - - self.memory_tracker - .increment_used_memory(element_size.as_bytes_u64()); - self.current_size += element_size; - self.buffer.push_back(element); - } - - /// Removes the oldest elements until there's enough space for the new element. - pub fn evict_by_size(&mut self, size_to_remove: u64) { - let mut removed_size = IggyByteSize::default(); - - while let Some(element) = self.buffer.pop_front() { - if removed_size >= size_to_remove { - break; - } - let elem_size = element.real_size(); - self.memory_tracker - .decrement_used_memory(elem_size.as_bytes_u64()); - self.current_size -= elem_size; - removed_size += elem_size; - } - } - - pub fn purge(&mut self) { - self.buffer.clear(); - self.memory_tracker - .decrement_used_memory(self.current_size.as_bytes_u64()); - self.current_size = IggyByteSize::default(); - } - - pub fn is_empty(&self) -> bool { - self.buffer.is_empty() - } - - pub fn current_size(&self) -> IggyByteSize { - self.current_size - } - - /// Extends the buffer with the given elements, and always adding the elements, - /// even if it exceeds the memory limit. - pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) { - let elements = elements.into_iter().inspect(|element| { - let element_size = element.real_size(); - self.memory_tracker - .increment_used_memory(element_size.as_bytes_u64()); - self.current_size += element_size; - }); - self.buffer.extend(elements); - } - - /// Always appends the element into the buffer, even if it exceeds the memory limit. - pub fn append(&mut self, element: T) { - let element_size = element.real_size(); - self.memory_tracker - .increment_used_memory(element_size.as_bytes_u64()); - self.current_size += element_size; - self.buffer.push(element); - } - - pub fn iter(&self) -> impl Iterator<Item = &T> { - self.buffer.iter() - } - - pub fn len(&self) -> usize { - self.buffer.len() - } - - pub fn get_metrics(&self) -> CacheMetrics { - let hits = self.hits.load(Ordering::Relaxed); - let misses = self.misses.load(Ordering::Relaxed); - let total = hits + misses; - let hit_ratio = if total > 0 { - hits as f32 / total as f32 - } else { - 0.0 - }; - - CacheMetrics { - hits, - misses, - hit_ratio, - } - } - - pub fn record_hit(&self) { - self.hits.fetch_add(1, Ordering::Relaxed); - } - - pub fn record_miss(&self) { - self.misses.fetch_add(1, Ordering::Relaxed); - } -} - -impl<T> Index<usize> for SmartCache<T> -where - T: RealSize + Clone + Debug, -{ - type Output = T; - - fn index(&self, index: usize) -> &Self::Output { - &self.buffer[index] - } -} - -impl<T: RealSize + Clone + Debug> Default for SmartCache<T> { - fn default() -> Self { - Self::new() - } -} - -#[derive(Debug)] -pub struct CacheMetrics { - pub hits: u64, - pub misses: u64, - pub hit_ratio: f32, -} diff --git a/server/src/streaming/cache/memory_tracker.rs b/server/src/streaming/cache/memory_tracker.rs deleted file mode 100644 index 835c44ff..00000000 --- a/server/src/streaming/cache/memory_tracker.rs +++ /dev/null @@ -1,101 +0,0 @@ -extern crate sysinfo; - -use crate::configs::resource_quota::MemoryResourceQuota; -use crate::configs::system::CacheConfig; -use iggy::utils::byte_size::IggyByteSize; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, OnceLock}; -use sysinfo::System; -use tracing::info; - -static INSTANCE: OnceLock<Option<Arc<CacheMemoryTracker>>> = OnceLock::new(); - -#[derive(Debug)] -pub struct CacheMemoryTracker { - used_memory_bytes: AtomicU64, - limit_bytes: IggyByteSize, -} - -type MessageSize = u64; - -impl CacheMemoryTracker { - pub fn initialize(config: &CacheConfig) -> Option<Arc<CacheMemoryTracker>> { - INSTANCE - .get_or_init(|| { - if config.enabled { - Some(Arc::new(CacheMemoryTracker::new(config.size.clone()))) - } else { - info!("Cache memory tracker disabled"); - None - } - }) - .clone() - } - - pub fn get_instance() -> Option<Arc<CacheMemoryTracker>> { - INSTANCE.get().cloned().flatten() - } - - fn new(limit: MemoryResourceQuota) -> Self { - let mut sys = System::new_all(); - sys.refresh_all(); - - let total_memory_bytes = IggyByteSize::from(sys.total_memory()); - let free_memory = IggyByteSize::from(sys.free_memory()); - let free_memory_percentage = - free_memory.as_bytes_u64() as f64 / total_memory_bytes.as_bytes_u64() as f64 * 100.0; - let used_memory_bytes = AtomicU64::new(0); - let limit_bytes = limit.into(); - - info!( - "Cache memory tracker started, cache: {}, total memory: {}, free memory: {}, free memory percentage: {:.2}%", - limit_bytes.as_human_string(), total_memory_bytes.as_human_string(), free_memory, free_memory_percentage - ); - - CacheMemoryTracker { - used_memory_bytes, - limit_bytes, - } - } - - pub fn increment_used_memory(&self, message_size: MessageSize) { - let mut current_cache_size_bytes = self.used_memory_bytes.load(Ordering::SeqCst); - loop { - let new_size = current_cache_size_bytes + message_size; - match self.used_memory_bytes.compare_exchange_weak( - current_cache_size_bytes, - new_size, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => break, - Err(actual_current) => current_cache_size_bytes = actual_current, - } - } - } - - pub fn decrement_used_memory(&self, message_size: MessageSize) { - let mut current_cache_size_bytes = self.used_memory_bytes.load(Ordering::SeqCst); - loop { - let new_size = current_cache_size_bytes - message_size; - match self.used_memory_bytes.compare_exchange_weak( - current_cache_size_bytes, - new_size, - Ordering::SeqCst, - Ordering::SeqCst, - ) { - Ok(_) => return, - Err(actual_current) => current_cache_size_bytes = actual_current, - } - } - } - - pub fn usage_bytes(&self) -> IggyByteSize { - IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) - } - - pub fn will_fit_into_cache(&self, requested_size: IggyByteSize) -> bool { - IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) + requested_size - <= self.limit_bytes - } -} diff --git a/server/src/streaming/cache/mod.rs b/server/src/streaming/cache/mod.rs deleted file mode 100644 index c055351b..00000000 --- a/server/src/streaming/cache/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod buffer; -#[allow(static_mut_refs)] -pub mod memory_tracker; diff --git a/server/src/streaming/partitions/storage.rs b/server/src/streaming/partitions/storage.rs index 80c89b43..f11520d7 100644 --- a/server/src/streaming/partitions/storage.rs +++ b/server/src/streaming/partitions/storage.rs @@ -42,12 +42,6 @@ impl PartitionStorage for FilePartitionStorage { let dir_entries = fs::read_dir(&partition.partition_path).await; if fs::read_dir(&partition.partition_path) .await - .inspect_err(|err| { - error!( - "Failed to read partition with ID: {} for stream with ID: {} and topic with ID: {} and path: {}. Error: {}", - partition.partition_id, partition.stream_id, partition.topic_id, partition.partition_path, err - ); - }) .with_error_context(|error| format!( "{COMPONENT} (error: {error}) - failed to read partition with ID: {} for stream with ID: {} and topic with ID: {} and path: {}.", partition.partition_id, partition.stream_id, partition.topic_id, partition.partition_path, @@ -132,11 +126,6 @@ impl PartitionStorage for FilePartitionStorage { segment.load_from_disk().await.with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to load segment: {segment}",) })?; - let capacity = partition.config.partition.messages_required_to_save; - // TODO(hubcio) - // if !segment.is_closed() { - // segment.unsaved_messages = Some(Default::default()); - // } // If the first segment has at least a single message, we should increment the offset. if !partition.should_increment_offset { @@ -187,9 +176,7 @@ impl PartitionStorage for FilePartitionStorage { if end_offset_index == segments_count - 1 { break; } - - // TODO(hubcio): fix this - // segment.end_offset() = end_offsets[end_offset_index]; + segment.set_end_offset(end_offsets[end_offset_index]); } if !partition.segments.is_empty() { diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs b/server/src/streaming/segments/indexes/indexes_mut.rs index 535676d7..91bf98b2 100644 --- a/server/src/streaming/segments/indexes/indexes_mut.rs +++ b/server/src/streaming/segments/indexes/indexes_mut.rs @@ -162,7 +162,7 @@ impl IggyIndexesMut { result } - /// Clears the container, removing all indexes + /// Clears the container, removing all indexes but preserving already allocated buffer capacity pub fn clear(&mut self) { self.saved_count = 0; self.buffer.clear(); diff --git a/server/src/streaming/segments/segment.rs b/server/src/streaming/segments/segment.rs index 246b5fe1..9a99bb42 100644 --- a/server/src/streaming/segments/segment.rs +++ b/server/src/streaming/segments/segment.rs @@ -174,6 +174,10 @@ impl Segment { self.messages_count_of_parent_partition .fetch_add(messages_count, Ordering::SeqCst); + if !self.config.segment.cache_indexes { + self.indexes = IggyIndexesMut::empty(); + } + Ok(()) } @@ -211,16 +215,15 @@ impl Segment { } pub async fn initialize_reading(&mut self) -> Result<(), IggyError> { + let messages_reader = + MessagesReader::new(&self.messages_path, self.messages_size.clone()).await?; + self.messages_reader = Some(messages_reader); + if !self.config.segment.cache_indexes { let index_reader = IndexReader::new(&self.index_path, self.indexes_size.clone()).await?; self.index_reader = Some(index_reader); } - - let messages_reader = - MessagesReader::new(&self.messages_path, self.messages_size.clone()).await?; - self.messages_reader = Some(messages_reader); - Ok(()) } @@ -372,6 +375,10 @@ impl Segment { pub fn messages_file_path(&self) -> &str { &self.messages_path } + + pub fn set_end_offset(&mut self, end_offset: u64) { + self.end_offset = end_offset; + } } impl std::fmt::Display for Segment { diff --git a/server/src/streaming/segments/types/messages_batch_set.rs b/server/src/streaming/segments/types/messages_batch_set.rs index af52db56..837042c9 100644 --- a/server/src/streaming/segments/types/messages_batch_set.rs +++ b/server/src/streaming/segments/types/messages_batch_set.rs @@ -1,7 +1,10 @@ +use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use bytes::Bytes; use iggy::models::messaging::IggyMessageView; use iggy::models::messaging::IggyMessagesBatch; use iggy::prelude::*; use std::ops::Index; +use tracing::trace; /// A container for multiple IggyMessagesBatch objects #[derive(Debug, Clone, Default)] @@ -111,6 +114,60 @@ impl IggyMessagesBatchSet { self.batches.iter() } + /// Convert this batch and poll metadata into a vector of fully-formed IggyMessage objects + /// + /// This method transforms the internal message views into complete IggyMessage objects + /// that can be returned to clients. + /// + /// # Arguments + /// + /// * `poll_metadata` - Metadata about the partition and current offset + /// + /// # Returns + /// + /// A vector of IggyMessage objects with proper metadata + pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> PolledMessages { + if self.is_empty() { + return PolledMessages { + messages: vec![], + partition_id: 0, + current_offset: 0, + count: 0, + }; + } + + let mut messages = Vec::with_capacity(self.count() as usize); + + // TODO(hubcio): this can be also optimized for zero copy, but it's http... + for batch in self.iter() { + for message in batch.iter() { + let header = message.header().to_header(); + let payload = Bytes::copy_from_slice(message.payload()); + let user_headers = message.user_headers().map(Bytes::copy_from_slice); + let message = IggyMessage { + header, + payload, + user_headers, + }; + messages.push(message); + } + } + + trace!( + "Converted batch of {} messages from partition {} with current offset {}", + messages.len(), + poll_metadata.partition_id, + poll_metadata.current_offset + ); + + PolledMessages { + partition_id: poll_metadata.partition_id, + current_offset: poll_metadata.current_offset, + count: messages.len() as u32, + messages, + } + } + /// Returns a new IggyMessagesBatch containing only messages with offsets greater than or equal to the specified offset, /// up to the specified count. /// diff --git a/server/src/streaming/topics/messages.rs b/server/src/streaming/topics/messages.rs index 15b3f932..fb6966f9 100644 --- a/server/src/streaming/topics/messages.rs +++ b/server/src/streaming/topics/messages.rs @@ -211,9 +211,10 @@ mod tests { for entity_id in 1..=messages_count { let message = IggyMessage::builder() - .with_id(entity_id as u128) - .with_payload(Bytes::from(entity_id.to_string())) - .build(); + .id(entity_id as u128) + .payload(Bytes::from(entity_id.to_string())) + .build() + .expect("Failed to create message with valid payload and headers"); let messages = IggyMessagesBatchMut::from_messages(&[message], 1); topic .append_messages(&partitioning, messages, None) @@ -243,10 +244,10 @@ mod tests { for entity_id in 1..=messages_count { let partitioning = Partitioning::messages_key_u32(entity_id); let message = IggyMessage::builder() - .with_id(entity_id as u128) - .with_payload(Bytes::new()) - .build(); - eprintln!("Message: {:#?}", message); + .id(entity_id as u128) + .payload(Bytes::new()) + .build() + .expect("Failed to create message with valid payload and headers"); let messages = IggyMessagesBatchMut::from_messages(&[message], 1); topic .append_messages(&partitioning, messages, None)
