This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 5a248f09d8faefd310e1d634d1cd913ae8548fca
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Mar 23 22:42:15 2025 +0100

    implement http (sadly with copy)
---
 integration/tests/streaming/get_by_offset.rs       |   9 +-
 integration/tests/streaming/get_by_timestamp.rs    |  10 +-
 integration/tests/streaming/mod.rs                 |   5 +-
 sdk/src/cli/message/send_messages.rs               |  15 +-
 sdk/src/http/messages.rs                           |  18 +-
 sdk/src/messages/polled_messages.rs                |  55 ++-
 sdk/src/messages/send_messages.rs                  | 103 +---
 sdk/src/models/messaging/indexes.rs                |   7 +
 sdk/src/models/messaging/message.rs                | 524 ++++++++++++++++-----
 sdk/src/models/messaging/message_header_view.rs    |   2 +-
 sdk/src/models/messaging/messages_batch.rs         |  72 ++-
 .../handlers/messages/poll_messages_handler.rs     |   4 +-
 server/src/http/messages.rs                        |  86 ++--
 server/src/streaming/cache/buffer.rs               | 177 -------
 server/src/streaming/cache/memory_tracker.rs       | 101 ----
 server/src/streaming/cache/mod.rs                  |   3 -
 server/src/streaming/partitions/storage.rs         |  15 +-
 .../src/streaming/segments/indexes/indexes_mut.rs  |   2 +-
 server/src/streaming/segments/segment.rs           |  17 +-
 .../streaming/segments/types/messages_batch_set.rs |  57 +++
 server/src/streaming/topics/messages.rs            |  15 +-
 21 files changed, 665 insertions(+), 632 deletions(-)

diff --git a/integration/tests/streaming/get_by_offset.rs 
b/integration/tests/streaming/get_by_offset.rs
index 7a67e6d9..94f5a854 100644
--- a/integration/tests/streaming/get_by_offset.rs
+++ b/integration/tests/streaming/get_by_offset.rs
@@ -141,10 +141,11 @@ async fn test_get_messages_by_offset(
         );
 
         let message = IggyMessage::builder()
-            .with_id(id)
-            .with_payload(payload)
-            .with_user_headers_map(headers)
-            .build();
+            .id(id)
+            .payload(payload)
+            .headers(headers)
+            .build()
+            .expect("Failed to create message with valid payload and headers");
         all_messages.push(message);
     }
 
diff --git a/integration/tests/streaming/get_by_timestamp.rs 
b/integration/tests/streaming/get_by_timestamp.rs
index d89e38ef..4210826a 100644
--- a/integration/tests/streaming/get_by_timestamp.rs
+++ b/integration/tests/streaming/get_by_timestamp.rs
@@ -143,10 +143,12 @@ async fn test_get_messages_by_timestamp(
         );
 
         let message = IggyMessage::builder()
-            .with_id(id)
-            .with_payload(payload)
-            .with_user_headers_map(headers)
-            .build();
+            .id(id)
+            .payload(payload)
+            .headers(headers)
+            .build()
+            .expect("Failed to create message with valid payload and headers");
+
         all_messages.push(message);
     }
 
diff --git a/integration/tests/streaming/mod.rs 
b/integration/tests/streaming/mod.rs
index 1799e9c3..3d4d1e48 100644
--- a/integration/tests/streaming/mod.rs
+++ b/integration/tests/streaming/mod.rs
@@ -28,7 +28,8 @@ fn create_messages() -> Vec<IggyMessage> {
 fn create_message(id: u128, payload: &str) -> IggyMessage {
     let payload = Bytes::from(payload.to_string());
     IggyMessage::builder()
-        .with_id(id)
-        .with_payload(payload)
+        .id(id)
+        .payload(payload)
         .build()
+        .expect("Failed to create message with valid payload and headers")
 }
diff --git a/sdk/src/cli/message/send_messages.rs 
b/sdk/src/cli/message/send_messages.rs
index dba2bd63..e6f08140 100644
--- a/sdk/src/cli/message/send_messages.rs
+++ b/sdk/src/cli/message/send_messages.rs
@@ -122,16 +122,18 @@ impl CliCommand for SendMessagesCmd {
 
             messages
         } else {
+            let headers = self.get_headers();
             match &self.messages {
                 Some(messages) => messages
                     .iter()
                     .map(|s| {
                         IggyMessage::builder()
-                            .with_payload(Bytes::from(s.clone()))
-                            .with_user_headers_map(self.get_headers().clone())
+                            .payload(Bytes::from(s.clone()))
+                            .headers(headers.clone().unwrap_or_default())
                             .build()
                     })
-                    .collect::<Vec<_>>(),
+                    .collect::<Result<Vec<_>, _>>()
+                    .with_context(|| "Failed to create messages from provided 
strings")?,
                 None => {
                     let input = self.read_message_from_stdin()?;
 
@@ -139,11 +141,12 @@ impl CliCommand for SendMessagesCmd {
                         .lines()
                         .map(|m| {
                             IggyMessage::builder()
-                                .with_payload(String::from(m).into())
-                                
.with_user_headers_map(self.get_headers().clone())
+                                .payload(m.to_owned())
+                                .headers(headers.clone().unwrap_or_default())
                                 .build()
                         })
-                        .collect()
+                        .collect::<Result<Vec<_>, _>>()
+                        .with_context(|| "Failed to create messages from 
stdin")?
                 }
             }
         };
diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs
index bb1bd2e1..bf42f604 100644
--- a/sdk/src/http/messages.rs
+++ b/sdk/src/http/messages.rs
@@ -5,7 +5,7 @@ use crate::http::client::HttpClient;
 use crate::http::HttpTransport;
 use crate::identifier::Identifier;
 use crate::messages::{Partitioning, PollingStrategy};
-use crate::prelude::{IggyMessage, PolledMessages};
+use crate::prelude::{FlushUnsavedBuffer, IggyMessage, PollMessages, 
PolledMessages, SendMessages};
 use async_trait::async_trait;
 
 #[async_trait]
@@ -20,7 +20,6 @@ impl MessageClient for HttpClient {
         count: u32,
         auto_commit: bool,
     ) -> Result<PolledMessages, IggyError> {
-        /*
         let response = self
             .get_with_query(
                 &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
@@ -40,8 +39,6 @@ impl MessageClient for HttpClient {
             .await
             .map_err(|_| IggyError::InvalidJsonResponse)?;
         Ok(messages)
-        */
-        todo!()
     }
 
     async fn send_messages(
@@ -51,20 +48,12 @@ impl MessageClient for HttpClient {
         partitioning: &Partitioning,
         messages: &mut [IggyMessage],
     ) -> Result<(), IggyError> {
-        /*
         self.post(
             &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
-            &SendMessages {
-                stream_id: stream_id.clone(),
-                topic_id: topic_id.clone(),
-                partitioning: partitioning.clone(),
-                messages: messages.to_vec(),
-            },
+            &SendMessages::as_bytes(stream_id, topic_id, partitioning, 
messages),
         )
         .await?;
         Ok(())
-        */
-        todo!()
     }
 
     async fn flush_unsaved_buffer(
@@ -74,7 +63,6 @@ impl MessageClient for HttpClient {
         partition_id: u32,
         fsync: bool,
     ) -> Result<(), IggyError> {
-        /*
         let _ = self
             .get_with_query(
                 &get_path_flush_unsaved_buffer(
@@ -92,8 +80,6 @@ impl MessageClient for HttpClient {
             )
             .await?;
         Ok(())
-        */
-        todo!()
     }
 }
 
diff --git a/sdk/src/messages/polled_messages.rs 
b/sdk/src/messages/polled_messages.rs
index 4d7fda47..3f4e929d 100644
--- a/sdk/src/messages/polled_messages.rs
+++ b/sdk/src/messages/polled_messages.rs
@@ -1,9 +1,10 @@
 use crate::{
     error::IggyError,
-    prelude::{BytesSerializable, IggyMessage},
+    prelude::{BytesSerializable, IggyMessage, IggyMessageHeader, 
IGGY_MESSAGE_HEADER_SIZE},
 };
-use bytes::{Bytes, BytesMut};
+use bytes::Bytes;
 use serde::{Deserialize, Serialize};
+use tracing::error;
 
 /// The wrapper on top of the collection of messages that are polled from the 
partition.
 /// It consists of the following fields:
@@ -58,7 +59,9 @@ impl BytesSerializable for PolledMessages {
                 .try_into()
                 .map_err(|_| IggyError::InvalidNumberEncoding)?,
         );
-        let messages = IggyMessage::from_raw_bytes(bytes.slice(16..), count)?;
+
+        let messages = messages_vec(bytes.slice(16..), count)?;
+
         Ok(Self {
             partition_id,
             current_offset,
@@ -66,12 +69,48 @@ impl BytesSerializable for PolledMessages {
             messages,
         })
     }
+}
 
-    fn write_to_buffer(&self, buf: &mut BytesMut) {
-        todo!()
-    }
+/// Convert Bytes to messages
+fn messages_vec(buffer: Bytes, count: u32) -> Result<Vec<IggyMessage>, 
IggyError> {
+    let mut messages = Vec::with_capacity(count as usize);
+    let mut position = 0;
+    let buf_len = buffer.len();
 
-    fn get_buffer_size(&self) -> u32 {
-        unimplemented!();
+    while position < buf_len {
+        if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
+            break;
+        }
+        let header_bytes = buffer.slice(position..position + 
IGGY_MESSAGE_HEADER_SIZE as usize);
+        let header = match IggyMessageHeader::from_bytes(header_bytes) {
+            Ok(h) => h,
+            Err(e) => {
+                error!("Failed to deserialize message header: {}", e);
+                return Err(e);
+            }
+        };
+        position += IGGY_MESSAGE_HEADER_SIZE as usize;
+
+        let payload_end = position + header.payload_length as usize;
+        if payload_end > buf_len {
+            break;
+        }
+        let payload = buffer.slice(position..payload_end);
+        position = payload_end;
+
+        let headers: Option<Bytes> = if header.user_headers_length > 0 {
+            Some(buffer.slice(position..position + header.user_headers_length 
as usize))
+        } else {
+            None
+        };
+        position += header.user_headers_length as usize;
+
+        messages.push(IggyMessage {
+            header,
+            payload,
+            user_headers: headers,
+        });
     }
+
+    Ok(messages)
 }
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 6c4b92da..8ceee3b5 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -133,16 +133,6 @@ impl Validatable<IggyError> for SendMessages {
         for message in IggyMessageViewIterator::new(&self.batch) {
             message_count += 1;
 
-            // TODO(hubcio): IMHO validation of headers should be purely on 
SDK side
-            // if let Some(headers) = message.headers() {
-            //     for value in headers.values() {
-            //         headers_size += value.value.len() as u32;
-            //         if headers_size > MAX_HEADERS_SIZE {
-            //             return Err(IggyError::TooBigHeadersPayload);
-            //         }
-            //     }
-            // }
-
             let message_payload = message.payload();
             payload_size += message_payload.len() as u32;
             if payload_size > MAX_PAYLOAD_SIZE {
@@ -171,7 +161,7 @@ impl BytesSerializable for SendMessages {
         panic!("should not be used")
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> {
+    fn from_bytes(_bytes: Bytes) -> Result<SendMessages, IggyError> {
         panic!("should not be used")
     }
 }
@@ -190,99 +180,8 @@ impl Display for SendMessages {
 
 #[cfg(test)]
 mod tests {
-    use std::str::FromStr;
-
     use super::*;
 
-    #[test]
-    fn should_be_serialized_as_bytes() {
-        let message_1 = IggyMessage::from_str("hello 1").unwrap();
-        let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None);
-        let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None);
-        let messages = vec![message_1, message_2, message_3];
-        let command = SendMessages {
-            stream_id: Identifier::numeric(1).unwrap(),
-            topic_id: Identifier::numeric(2).unwrap(),
-            partitioning: Partitioning::partition_id(4),
-            messages,
-        };
-
-        let bytes = command.to_bytes();
-
-        let mut position = 0;
-        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
-        position += stream_id.get_size_bytes().as_bytes_usize();
-        let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
-        position += topic_id.get_size_bytes().as_bytes_usize();
-        let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap();
-        position += key.get_size_bytes().as_bytes_usize();
-        let messages = bytes.slice(position..);
-        let command_messages = command
-            .messages
-            .iter()
-            .fold(BytesMut::new(), |mut bytes_mut, message| {
-                bytes_mut.put(message.to_bytes());
-                bytes_mut
-            })
-            .freeze();
-
-        assert!(!bytes.is_empty());
-        assert_eq!(stream_id, command.stream_id);
-        assert_eq!(topic_id, command.topic_id);
-        assert_eq!(key, command.partitioning);
-        assert_eq!(messages, command_messages);
-    }
-
-    #[test]
-    fn should_be_deserialized_from_bytes() {
-        let stream_id = Identifier::numeric(1).unwrap();
-        let topic_id = Identifier::numeric(2).unwrap();
-        let key = Partitioning::partition_id(4);
-
-        let message_1 = IggyMessage::from_str("hello 1").unwrap();
-        let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None);
-        let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None);
-        let messages = [
-            message_1.to_bytes(),
-            message_2.to_bytes(),
-            message_3.to_bytes(),
-        ]
-        .concat();
-
-        let key_bytes = key.to_bytes();
-        let stream_id_bytes = stream_id.to_bytes();
-        let topic_id_bytes = topic_id.to_bytes();
-        let current_position = stream_id_bytes.len() + topic_id_bytes.len() + 
key_bytes.len();
-        let mut bytes = BytesMut::with_capacity(current_position);
-        bytes.put_slice(&stream_id_bytes);
-        bytes.put_slice(&topic_id_bytes);
-        bytes.put_slice(&key_bytes);
-        bytes.put_slice(&messages);
-        let bytes = bytes.freeze();
-        let command = SendMessages::from_bytes(bytes.clone());
-        assert!(command.is_ok());
-
-        let messages_payloads = bytes.slice(current_position..);
-        let mut position = 0;
-        let mut messages = Vec::new();
-        while position < messages_payloads.len() {
-            let message = 
IggyMessage::from_bytes(messages_payloads.slice(position..)).unwrap();
-            position += message.get_size_bytes().as_bytes_usize();
-            messages.push(message);
-        }
-
-        let command = command.unwrap();
-        assert_eq!(command.stream_id, stream_id);
-        assert_eq!(command.topic_id, topic_id);
-        assert_eq!(command.partitioning, key);
-        for (index, message) in command.messages.iter().enumerate() {
-            let command_message = &command.messages[index];
-            assert_eq!(command_message.id, message.id);
-            assert_eq!(command_message.length, message.length);
-            assert_eq!(command_message.payload, message.payload);
-        }
-    }
-
     #[test]
     fn key_of_type_balanced_should_have_empty_value() {
         let key = Partitioning::balanced();
diff --git a/sdk/src/models/messaging/indexes.rs 
b/sdk/src/models/messaging/indexes.rs
index 56c63055..95eda691 100644
--- a/sdk/src/models/messaging/indexes.rs
+++ b/sdk/src/models/messaging/indexes.rs
@@ -20,6 +20,7 @@ impl IggyIndexes {
         }
     }
 
+    /// Creates a new empty container
     pub fn empty() -> Self {
         Self {
             buffer: Bytes::new(),
@@ -137,6 +138,7 @@ impl IggyIndexes {
         result
     }
 
+    /// Gets the base position of the container
     pub fn base_position(&self) -> u32 {
         self.base_position
     }
@@ -147,6 +149,11 @@ impl IggyIndexes {
             .map(|idx| idx.position())
             .unwrap_or(0)
     }
+
+    /// Decompose the container into its components
+    pub fn decompose(self) -> (u32, Bytes) {
+        (self.base_position, self.buffer)
+    }
 }
 
 impl StdIndex<usize> for IggyIndexes {
diff --git a/sdk/src/models/messaging/message.rs 
b/sdk/src/models/messaging/message.rs
index ce12c14e..789a9af9 100644
--- a/sdk/src/models/messaging/message.rs
+++ b/sdk/src/models/messaging/message.rs
@@ -11,114 +11,262 @@ use serde::{Deserialize, Serialize};
 use serde_with::base64::Base64;
 use serde_with::serde_as;
 use std::collections::HashMap;
+use std::convert::TryFrom;
 use std::str::FromStr;
-use tracing::error;
-
-/// The single message. It is exact format in which message is saved to / 
retrieved from the disk.
+use tracing::warn;
+
+/// A message stored in the Iggy messaging system.
+///
+/// `IggyMessage` represents a single message that can be sent to or received 
from
+/// a stream. Each message consists of:
+/// * A header with message metadata
+/// * A payload (the actual content)
+/// * Optional user-defined headers for additional context
+///
+/// # Examples
+///
+/// ```
+/// // Create a simple text message
+/// let message = IggyMessage::create("Hello world!");
+///
+/// // Create a message with custom ID
+/// let message = IggyMessage::with_id(42, "Custom message".into());
+///
+/// // Create a message with headers
+/// let mut headers = HashMap::new();
+/// headers.insert(HeaderKey::new("content-type")?, 
HeaderValue::from_str("text/plain")?);
+/// let message = IggyMessage::with_headers("Message with metadata".into(), 
headers);
+/// ```
 #[serde_as]
-#[derive(Default, Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, Serialize, Deserialize, PartialEq)]
 pub struct IggyMessage {
-    /// Message header
+    /// Message metadata
     pub header: IggyMessageHeader,
-    /// Message payload
+
+    /// Message content
     #[serde_as(as = "Base64")]
     pub payload: Bytes,
+
+    /// Optional user-defined headers
     pub user_headers: Option<Bytes>,
 }
 
 impl IggyMessage {
-    /// Create a message with payload
-    pub fn new(payload: Bytes) -> Self {
-        Self::builder().with_payload(payload).build()
+    /// Creates a new message with the given payload.
+    ///
+    /// This is the simplest way to create a message when you don't need
+    /// custom IDs or headers.
+    ///
+    /// # Arguments
+    ///
+    /// * `payload` - The message content
+    ///
+    /// # Returns
+    ///
+    /// A new `IggyMessage` with the provided payload
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// let message = IggyMessage::create("Hello world!");
+    /// ```
+    pub fn create<T: Into<Bytes>>(payload: T) -> Self {
+        Self::builder()
+            .payload(payload)
+            .build()
+            .expect("Failed to create message with valid payload")
+    }
+
+    /// Creates a new message with a specific ID and payload.
+    ///
+    /// Use this when you need to control the message ID.
+    ///
+    /// # Arguments
+    ///
+    /// * `id` - Custom message ID
+    /// * `payload` - The message content
+    ///
+    /// # Returns
+    ///
+    /// A new `IggyMessage` with the provided ID and payload
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// let message = IggyMessage::with_id(42, "My message".into());
+    /// ```
+    pub fn with_id<T: Into<Bytes>>(id: u128, payload: T) -> Self {
+        Self::builder()
+            .id(id)
+            .payload(payload)
+            .build()
+            .expect("Failed to create message with valid payload")
     }
 
-    /// Create a message with ID and payload
-    pub fn with_id(id: u128, payload: Bytes) -> Self {
-        Self::builder().with_id(id).with_payload(payload).build()
+    /// Creates a new message with payload and user-defined headers.
+    ///
+    /// # Arguments
+    ///
+    /// * `payload` - The message content
+    /// * `headers` - Key-value headers to attach to the message
+    ///
+    /// # Returns
+    ///
+    /// A new `IggyMessage` with the provided payload and headers
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// let mut headers = HashMap::new();
+    /// headers.insert(HeaderKey::new("content-type")?, 
HeaderValue::from_str("text/plain")?);
+    ///
+    /// let message = IggyMessage::with_headers("My message".into(), headers);
+    /// ```
+    pub fn with_headers<T: Into<Bytes>>(
+        payload: T,
+        headers: HashMap<HeaderKey, HeaderValue>,
+    ) -> Self {
+        Self::builder()
+            .payload(payload)
+            .headers(headers)
+            .build()
+            .expect("Failed to create message with valid payload and headers")
     }
 
-    /// Create a message with ID, payload and user headers
-    pub fn with_id_and_headers(
+    /// Creates a new message with a specific ID, payload, and user-defined 
headers.
+    ///
+    /// This is the most flexible way to create a message with full control.
+    ///
+    /// # Arguments
+    ///
+    /// * `id` - Custom message ID
+    /// * `payload` - The message content
+    /// * `headers` - Key-value headers to attach to the message
+    ///
+    /// # Returns
+    ///
+    /// A new `IggyMessage` with all provided parameters
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// let mut headers = HashMap::new();
+    /// headers.insert(HeaderKey::new("content-type")?, 
HeaderValue::from_str("text/plain")?);
+    ///
+    /// let message = IggyMessage::with_id_and_headers(42, "My 
message".into(), headers);
+    /// ```
+    pub fn with_id_and_headers<T: Into<Bytes>>(
         id: u128,
-        payload: Bytes,
+        payload: T,
         headers: HashMap<HeaderKey, HeaderValue>,
     ) -> Self {
         Self::builder()
-            .with_id(id)
-            .with_payload(payload)
-            .with_user_headers_map(headers)
+            .id(id)
+            .payload(payload)
+            .headers(headers)
             .build()
+            .expect("Failed to create message with valid payload and headers")
     }
 
-    /// Start a builder for more complex configuration
+    /// Creates a message builder for advanced configuration.
+    ///
+    /// Use the builder when you need fine-grained control over message 
creation.
+    ///
+    /// # Returns
+    ///
+    /// A new `IggyMessageBuilder` instance
+    ///
+    /// # Examples
+    ///
+    /// ```
+    /// let message = IggyMessage::builder()
+    ///     .id(123)
+    ///     .payload("Hello")
+    ///     .header("content-type", "text/plain")
+    ///     .build()?;
+    /// ```
     pub fn builder() -> IggyMessageBuilder {
         IggyMessageBuilder::new()
     }
 
-    /// Return instantiated user headers map
+    /// Gets the user headers as a typed HashMap.
+    ///
+    /// This method parses the binary header data into a typed HashMap for 
easy access.
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(Some(HashMap))` - Successfully parsed headers
+    /// * `Ok(None)` - No headers present
+    /// * `Err(IggyError)` - Error parsing headers
     pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, 
HeaderValue>>, IggyError> {
-        if let Some(headers) = &self.user_headers {
-            let headers_bytes = Bytes::copy_from_slice(headers);
-
-            match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) 
{
-                Ok(h) => Ok(Some(h)),
-                Err(e) => {
-                    tracing::error!(
-                        "Error parsing headers: {}, header_length={}",
-                        e,
-                        self.header.user_headers_length
-                    );
-
-                    Ok(None)
+        match &self.user_headers {
+            Some(headers) => {
+                let headers_bytes = Bytes::copy_from_slice(headers);
+                match HashMap::<HeaderKey, 
HeaderValue>::from_bytes(headers_bytes) {
+                    Ok(h) => Ok(Some(h)),
+                    Err(e) => {
+                        warn!(
+                            "Failed to deserialize user headers: {e}, 
user_headers_length: {}, skipping field...",
+                            self.header.user_headers_length
+                        );
+                        Ok(None)
+                    }
                 }
             }
-        } else {
-            Ok(None)
+            None => Ok(None),
         }
     }
 
-    /// Convert Bytes to messages
-    pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> 
Result<Vec<IggyMessage>, IggyError> {
-        let mut messages = Vec::with_capacity(count as usize);
-        let mut position = 0;
-        let buf_len = buffer.len();
+    /// Retrieves a specific user header value by key.
+    ///
+    /// This is a convenience method to get a specific header without handling 
the full map.
+    ///
+    /// # Arguments
+    ///
+    /// * `key` - The header key to look up
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(Some(HeaderValue))` - Header found with its value
+    /// * `Ok(None)` - Header not found or headers couldn't be parsed
+    /// * `Err(IggyError)` - Error accessing headers
+    pub fn get_header(&self, key: &HeaderKey) -> Result<Option<HeaderValue>, 
IggyError> {
+        Ok(self
+            .user_headers_map()?
+            .and_then(|map| map.get(key).cloned()))
+    }
 
-        while position < buf_len {
-            if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
-                break;
-            }
-            let header_bytes = buffer.slice(position..position + 
IGGY_MESSAGE_HEADER_SIZE as usize);
-            let header = match IggyMessageHeader::from_bytes(header_bytes) {
-                Ok(h) => h,
-                Err(e) => {
-                    error!("Failed to parse message header: {}", e);
-                    return Err(e);
-                }
-            };
-            position += IGGY_MESSAGE_HEADER_SIZE as usize;
+    /// Checks if this message contains a specific header key.
+    ///
+    /// # Arguments
+    ///
+    /// * `key` - The header key to check for
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(true)` - Header exists
+    /// * `Ok(false)` - Header doesn't exist or headers couldn't be parsed
+    /// * `Err(IggyError)` - Error accessing headers
+    pub fn has_header(&self, key: &HeaderKey) -> Result<bool, IggyError> {
+        Ok(self
+            .user_headers_map()?
+            .is_some_and(|map| map.contains_key(key)))
+    }
 
-            let payload_end = position + header.payload_length as usize;
-            if payload_end > buf_len {
-                break;
-            }
-            let payload = buffer.slice(position..payload_end);
-            position = payload_end;
-
-            let headers: Option<Bytes> = if header.user_headers_length > 0 {
-                Some(buffer.slice(position..position + 
header.user_headers_length as usize))
-            } else {
-                None
-            };
-            position += header.user_headers_length as usize;
-
-            messages.push(IggyMessage {
-                header,
-                payload,
-                user_headers: headers,
-            });
-        }
+    /// Gets the payload as a UTF-8 string, if valid.
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(String)` - Successfully converted payload to string
+    /// * `Err(IggyError)` - Payload is not valid UTF-8
+    pub fn payload_as_string(&self) -> Result<String, IggyError> {
+        String::from_utf8(self.payload.to_vec()).map_err(|_| 
IggyError::InvalidUtf8)
+    }
+}
 
-        Ok(messages)
+impl Default for IggyMessage {
+    fn default() -> Self {
+        Self::create("hello world")
     }
 }
 
@@ -126,45 +274,45 @@ impl FromStr for IggyMessage {
     type Err = IggyError;
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
-        let payload = Bytes::from(s.as_bytes().to_vec());
-        Ok(IggyMessage::new(payload))
+        Ok(Self::create(Bytes::from(s.to_owned())))
     }
 }
 
 impl std::fmt::Display for IggyMessage {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        let len = self.payload.len();
-
-        if len > 40 {
-            write!(
-                f,
-                "{}|{}...{}",
-                self.header.id,
-                String::from_utf8_lossy(&self.payload[..20]),
-                String::from_utf8_lossy(&self.payload[len - 20..])
-            )
-        } else {
-            write!(
-                f,
-                "{}|{}",
-                self.header.id,
-                String::from_utf8_lossy(&self.payload)
-            )
+        match String::from_utf8(self.payload.to_vec()) {
+            Ok(payload) => {
+                let preview = if payload.len() > 50 {
+                    format!("{}... ({}B)", &payload[..47], self.payload.len())
+                } else {
+                    payload
+                };
+                write!(
+                    f,
+                    "[{}] ID:{} '{}'",
+                    self.header.offset, self.header.id, preview
+                )
+            }
+            Err(_) => {
+                write!(
+                    f,
+                    "[{}] ID:{} <binary {}B>",
+                    self.header.offset,
+                    self.header.id,
+                    self.payload.len()
+                )
+            }
         }
     }
 }
 
 impl Sizeable for IggyMessage {
     fn get_size_bytes(&self) -> IggyByteSize {
-        let payload_len = IggyByteSize::from(self.payload.len() as u64);
-        let headers_len = if let Some(headers) = &self.user_headers {
-            IggyByteSize::from(headers.len() as u64)
-        } else {
-            IggyByteSize::from(0)
-        };
-        let message_header_len = IggyByteSize::from(IGGY_MESSAGE_HEADER_SIZE 
as u64);
+        let payload_len = self.payload.len() as u64;
+        let headers_len = self.user_headers.as_ref().map_or(0, |h| h.len() as 
u64);
+        let message_header_len = IGGY_MESSAGE_HEADER_SIZE as u64;
 
-        payload_len + headers_len + message_header_len
+        IggyByteSize::from(payload_len + headers_len + message_header_len)
     }
 }
 
@@ -184,19 +332,27 @@ impl BytesSerializable for IggyMessage {
         if bytes.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
             return Err(IggyError::InvalidCommand);
         }
+
         let mut position = 0;
         let header =
             
IggyMessageHeader::from_bytes(bytes.slice(0..IGGY_MESSAGE_HEADER_SIZE as 
usize))?;
 
         position += IGGY_MESSAGE_HEADER_SIZE as usize;
-        let payload = bytes.slice(position..position + header.payload_length 
as usize);
-        if payload.len() != header.payload_length as usize {
+        let payload_end = position + header.payload_length as usize;
+
+        if payload_end > bytes.len() {
             return Err(IggyError::InvalidMessagePayloadLength);
         }
 
-        position += header.payload_length as usize;
+        let payload = bytes.slice(position..payload_end);
+        position = payload_end;
+
         let user_headers = if header.user_headers_length > 0 {
-            Some(bytes.slice(position..position + header.user_headers_length 
as usize))
+            let headers_end = position + header.user_headers_length as usize;
+            if headers_end > bytes.len() {
+                return Err(IggyError::InvalidHeaderValue);
+            }
+            Some(bytes.slice(position..headers_end))
         } else {
             None
         };
@@ -208,7 +364,6 @@ impl BytesSerializable for IggyMessage {
         })
     }
 
-    /// Write message to bytes mut
     fn write_to_buffer(&self, buf: &mut BytesMut) {
         buf.put_slice(&self.header.to_bytes());
         buf.put_slice(&self.payload);
@@ -218,6 +373,10 @@ impl BytesSerializable for IggyMessage {
     }
 }
 
+/// Builder for creating `IggyMessage` instances with flexible configuration.
+///
+/// The builder pattern allows for clear, step-by-step construction of complex
+/// message configurations, with better error handling than chained 
constructors.
 #[derive(Debug, Default)]
 pub struct IggyMessageBuilder {
     id: Option<u128>,
@@ -226,39 +385,64 @@ pub struct IggyMessageBuilder {
 }
 
 impl IggyMessageBuilder {
+    /// Creates a new empty message builder.
     pub fn new() -> Self {
-        Self {
-            id: None,
-            payload: None,
-            headers: None,
-        }
+        Self::default()
     }
 
-    pub fn with_id(mut self, id: u128) -> Self {
+    /// Sets the message ID.
+    ///
+    /// If not specified, a default ID (0) will be used.
+    pub fn id(mut self, id: u128) -> Self {
         self.id = Some(id);
         self
     }
 
-    pub fn with_payload(mut self, payload: Bytes) -> Self {
-        self.payload = Some(payload);
+    /// Sets the message payload.
+    ///
+    /// This method accepts any type that can be converted into `Bytes`,
+    /// including strings, byte slices, and vectors.
+    pub fn payload<T: Into<Bytes>>(mut self, payload: T) -> Self {
+        self.payload = Some(payload.into());
         self
     }
 
-    pub fn with_user_key_value_header(mut self, key: HeaderKey, value: 
HeaderValue) -> Self {
+    /// Adds a single header key-value pair to the message.
+    ///
+    /// Multiple calls will add multiple headers.
+    pub fn header(mut self, key: HeaderKey, value: HeaderValue) -> Self {
         let headers = self.headers.get_or_insert_with(HashMap::new);
         headers.insert(key, value);
         self
     }
 
-    pub fn with_user_headers_map(
-        mut self,
-        headers: impl Into<Option<HashMap<HeaderKey, HeaderValue>>>,
-    ) -> Self {
-        self.headers = headers.into();
+    /// Adds a string header with the given key and value.
+    ///
+    /// This is a convenience method for adding string headers without
+    /// manually creating HeaderValue objects.
+    pub fn string_header(mut self, key: &str, value: &str) -> Result<Self, 
IggyError> {
+        let key = HeaderKey::new(key)?;
+        let value = HeaderValue::from_str(value)?;
+        let headers = self.headers.get_or_insert_with(HashMap::new);
+        headers.insert(key, value);
+        Ok(self)
+    }
+
+    /// Sets all headers at once from a HashMap.
+    ///
+    /// This replaces any headers previously added.
+    pub fn headers(mut self, headers: HashMap<HeaderKey, HeaderValue>) -> Self 
{
+        self.headers = Some(headers);
         self
     }
 
-    pub fn build(self) -> IggyMessage {
+    /// Builds the final IggyMessage from the configured parameters.
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(IggyMessage)` - Successfully built message
+    /// * `Err(IggyError)` - Error during message construction
+    pub fn build(self) -> Result<IggyMessage, IggyError> {
         let payload = self.payload.unwrap_or_default();
         let id = self.id.unwrap_or(0);
         let headers_length = 
get_headers_size_bytes(&self.headers).as_bytes_u64() as u32;
@@ -275,28 +459,116 @@ impl IggyMessageBuilder {
 
         let user_headers = self.headers.map(|headers| headers.to_bytes());
 
-        IggyMessage {
+        Ok(IggyMessage {
             header,
             payload,
             user_headers,
-        }
+        })
+    }
+}
+
+// Clean implementations of conversion traits
+
+impl From<IggyMessage> for Bytes {
+    fn from(message: IggyMessage) -> Self {
+        message.to_bytes()
     }
 }
 
 impl From<String> for IggyMessage {
     fn from(s: String) -> Self {
-        Self::new(Bytes::from(s))
+        Self::create(s)
     }
 }
 
 impl From<&str> for IggyMessage {
     fn from(s: &str) -> Self {
-        Self::new(Bytes::from(s.to_owned()))
+        Self::create(Bytes::from(s.to_owned()))
     }
 }
 
 impl From<Vec<u8>> for IggyMessage {
     fn from(v: Vec<u8>) -> Self {
-        Self::new(Bytes::from(v))
+        Self::create(v)
+    }
+}
+
+impl From<&[u8]> for IggyMessage {
+    fn from(bytes: &[u8]) -> Self {
+        Self::create(Bytes::copy_from_slice(bytes))
+    }
+}
+
+impl TryFrom<Bytes> for IggyMessage {
+    type Error = IggyError;
+
+    fn try_from(bytes: Bytes) -> Result<Self, Self::Error> {
+        Self::from_bytes(bytes)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_create_simple_message() {
+        let message = IggyMessage::create("test message");
+        assert_eq!(message.payload, Bytes::from("test message"));
+        assert_eq!(message.header.id, 0);
+        assert!(message.user_headers.is_none());
+    }
+
+    #[test]
+    fn test_create_with_id() {
+        let message = IggyMessage::with_id(42, "test with id");
+        assert_eq!(message.payload, Bytes::from("test with id"));
+        assert_eq!(message.header.id, 42);
+    }
+
+    #[test]
+    fn test_create_with_headers() {
+        let mut headers = HashMap::new();
+        headers.insert(
+            HeaderKey::new("content-type").unwrap(),
+            HeaderValue::from_str("text/plain").unwrap(),
+        );
+
+        let message = IggyMessage::with_headers("test with headers", headers);
+        assert_eq!(message.payload, Bytes::from("test with headers"));
+        assert!(message.user_headers.is_some());
+
+        let headers_map = message.user_headers_map().unwrap().unwrap();
+        assert_eq!(headers_map.len(), 1);
+        
assert!(headers_map.contains_key(&HeaderKey::new("content-type").unwrap()));
+    }
+
+    #[test]
+    fn test_empty_payload() {
+        let message = IggyMessage::create(Bytes::new());
+        assert_eq!(message.payload.len(), 0);
+        assert_eq!(message.header.payload_length, 0);
+    }
+
+    #[test]
+    fn test_from_string() {
+        let message: IggyMessage = "simple message".into();
+        assert_eq!(message.payload, Bytes::from("simple message"));
+    }
+
+    #[test]
+    fn test_payload_as_string() {
+        let message = IggyMessage::create("test message");
+        assert_eq!(message.payload_as_string().unwrap(), "test message");
+    }
+
+    #[test]
+    fn test_serialization_roundtrip() {
+        let original = IggyMessage::with_id(123, "serialization test");
+        let bytes = original.to_bytes();
+        let decoded = IggyMessage::from_bytes(bytes).unwrap();
+
+        assert_eq!(original.header.id, decoded.header.id);
+        assert_eq!(original.payload, decoded.payload);
     }
 }
diff --git a/sdk/src/models/messaging/message_header_view.rs 
b/sdk/src/models/messaging/message_header_view.rs
index 7c279d1a..1c35eb60 100644
--- a/sdk/src/models/messaging/message_header_view.rs
+++ b/sdk/src/models/messaging/message_header_view.rs
@@ -85,7 +85,7 @@ impl BytesSerializable for IggyMessageHeaderView<'_> {
         panic!("should not be used")
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> {
+    fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> {
         panic!("should not be used")
     }
 
diff --git a/sdk/src/models/messaging/messages_batch.rs 
b/sdk/src/models/messaging/messages_batch.rs
index e8c044a4..7bddf2cd 100644
--- a/sdk/src/models/messaging/messages_batch.rs
+++ b/sdk/src/models/messaging/messages_batch.rs
@@ -22,7 +22,7 @@ pub struct IggyMessagesBatch {
 }
 
 impl IggyMessagesBatch {
-    /// Create a new messages container from a buffer
+    /// Create a batch from indexes buffer and messages buffer
     pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self {
         #[cfg(debug_assertions)]
         {
@@ -57,7 +57,7 @@ impl IggyMessagesBatch {
         }
     }
 
-    /// Creates a empty messages container
+    /// Creates a empty messages batch
     pub fn empty() -> Self {
         Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0)
     }
@@ -104,8 +104,8 @@ impl IggyMessagesBatch {
     }
 
     /// Decompose the batch into its components
-    pub fn decompose(self) -> (IggyIndexes, Bytes, u32) {
-        (self.indexes, self.messages, self.count)
+    pub fn decompose(self) -> (u32, IggyIndexes, Bytes) {
+        (self.count, self.indexes, self.messages)
     }
 
     /// Get index of first message
@@ -140,16 +140,6 @@ impl IggyMessagesBatch {
             .unwrap_or(0)
     }
 
-    /// Helper method to read a base position (u32) from the byte array at the 
given index
-    fn base_position_at(&self, position_index: u32) -> u32 {
-        tracing::error!("base_position = {}", self.indexes.base_position());
-        if let Some(index) = self.indexes.get(position_index) {
-            index.position() - self.indexes.base_position()
-        } else {
-            0
-        }
-    }
-
     /// Helper method to read a position (u32) from the byte array at the 
given index
     fn position_at(&self, position_index: u32) -> u32 {
         if let Some(index) = self.indexes.get(position_index) {
@@ -306,6 +296,60 @@ impl IggyMessagesBatch {
             &self.messages[start_position..end_position],
         ))
     }
+
+    /// Creates a batch from a vector of messages. Note that this 
implementation is:
+    /// - Inefficient with multiple buffer allocations and copies
+    /// - Serializes each message individually rather than in batches
+    /// - Designed for testing purposes only, not optimized for production
+    /// - May cause performance issues with large message collections
+    ///
+    /// This should be used only for testing purposes!
+    /// TODO(hubcio): maybe it can be removed
+    #[cfg(test)]
+    pub fn from_messages_vec(messages: &[crate::prelude::IggyMessage]) -> Self 
{
+        use crate::prelude::BytesSerializable;
+        use bytes::{BufMut, BytesMut};
+
+        if messages.is_empty() {
+            return Self::empty();
+        }
+
+        let messages_count = messages.len() as u32;
+        let mut total_size = 0;
+        for msg in messages.iter() {
+            total_size += msg.get_size_bytes().as_bytes_usize();
+        }
+
+        let mut messages_buffer = BytesMut::with_capacity(total_size);
+        let mut indexes_buffer = BytesMut::new();
+        let mut current_position = 0;
+
+        for (i, message) in messages.iter().enumerate() {
+            message.write_to_buffer(&mut messages_buffer);
+
+            let msg_size = message.get_size_bytes().as_bytes_u32();
+            current_position += msg_size;
+
+            let offset = i as u32;
+
+            indexes_buffer.put_u32_le(offset);
+            indexes_buffer.put_u32_le(current_position);
+            indexes_buffer.put_u64_le(0); // timestamps from indexes are not 
used
+        }
+
+        let indexes = 
crate::models::messaging::IggyIndexes::new(indexes_buffer.freeze(), 0);
+
+        Self {
+            count: messages_count,
+            indexes,
+            messages: messages_buffer.freeze(),
+        }
+    }
+
+    #[cfg(test)]
+    pub fn get_indexes(&self) -> &crate::models::messaging::IggyIndexes {
+        &self.indexes
+    }
 }
 
 impl Index<usize> for IggyMessagesBatch {
diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/server/src/binary/handlers/messages/poll_messages_handler.rs
index 79b43a79..95742586 100644
--- a/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -13,8 +13,8 @@ use tracing::debug;
 
 #[derive(Debug)]
 pub struct IggyPollMetadata {
-    partition_id: u32,
-    current_offset: u64,
+    pub partition_id: u32,
+    pub current_offset: u64,
 }
 
 impl IggyPollMetadata {
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 7d1a3031..70e4a84e 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -2,6 +2,7 @@ use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
 use crate::http::COMPONENT;
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::systems::messages::PollingArgs;
 use crate::streaming::utils::random_id;
@@ -9,9 +10,11 @@ use axum::extract::{Path, Query, State};
 use axum::http::StatusCode;
 use axum::routing::get;
 use axum::{Extension, Json, Router};
+use bytes::BytesMut;
 use error_set::ErrContext;
 use iggy::consumer::Consumer;
 use iggy::identifier::Identifier;
+use iggy::models::messaging::IggyMessagesBatch;
 use iggy::prelude::*;
 use iggy::validatable::Validatable;
 use std::sync::Arc;
@@ -35,31 +38,31 @@ async fn poll_messages(
     Extension(identity): Extension<Identity>,
     Path((stream_id, topic_id)): Path<(String, String)>,
     mut query: Query<PollMessages>,
-) -> Result<Json<Vec<IggyMessage>>, CustomError> {
-    // query.stream_id = Identifier::from_str_value(&stream_id)?;
-    // query.topic_id = Identifier::from_str_value(&topic_id)?;
-    // query.validate()?;
+) -> Result<Json<PolledMessages>, CustomError> {
+    query.stream_id = Identifier::from_str_value(&stream_id)?;
+    query.topic_id = Identifier::from_str_value(&topic_id)?;
+    query.validate()?;
 
-    // let consumer = Consumer::new(query.0.consumer.id);
-    // let system = state.system.read().await;
-    // let polled_messages = system
-    //     .poll_messages(
-    //         &Session::stateless(identity.user_id, identity.ip_address),
-    //         &consumer,
-    //         &query.0.stream_id,
-    //         &query.0.topic_id,
-    //         query.0.partition_id,
-    //         PollingArgs::new(query.0.strategy, query.0.count, 
query.0.auto_commit),
-    //     )
-    //     .await
-    //     .with_error_context(|error| {
-    //         format!(
-    //             "{COMPONENT} (error: {error}) - failed to poll messages, 
stream ID: {}, topic ID: {}, partition ID: {:?}",
-    //             stream_id, topic_id, query.0.partition_id
-    //         )
-    //     })?;
-    // Ok(Json(polled_messages))
-    todo!()
+    let consumer = Consumer::new(query.0.consumer.id);
+    let system = state.system.read().await;
+    let (metadata, messages) = system
+        .poll_messages(
+            &Session::stateless(identity.user_id, identity.ip_address),
+            &consumer,
+            &query.0.stream_id,
+            &query.0.topic_id,
+            query.0.partition_id,
+            PollingArgs::new(query.0.strategy, query.0.count, 
query.0.auto_commit),
+        )
+        .await
+        .with_error_context(|error| {
+            format!(
+                "{COMPONENT} (error: {error}) - failed to poll messages, 
stream ID: {}, topic ID: {}, partition ID: {:?}",
+                stream_id, topic_id, query.0.partition_id
+            )
+        })?;
+    let polled_messages = messages.into_polled_messages(metadata);
+    Ok(Json(polled_messages))
 }
 
 async fn send_messages(
@@ -68,31 +71,30 @@ async fn send_messages(
     Path((stream_id, topic_id)): Path<(String, String)>,
     Json(mut command): Json<SendMessages>,
 ) -> Result<StatusCode, CustomError> {
-    //TODO: Fix me
-    /*
     command.stream_id = Identifier::from_str_value(&stream_id)?;
     command.topic_id = Identifier::from_str_value(&topic_id)?;
     command.partitioning.length = command.partitioning.value.len() as u8;
-    command.messages.iter_mut().for_each(|msg| {
-        if msg.id == 0 {
-            msg.id = random_id::get_uuid();
-        }
-    });
+    // TODO(hubcio): Add message ID generation?
+    // command.messages.iter_mut().for_each(|msg| {
+    //     if msg.id == 0 {
+    //         msg.id = random_id::get_uuid();
+    //     }
+    // });
     command.validate()?;
 
-    let messages = command.messages;
+    let batch = make_mutable(command.batch);
     let command_stream_id = command.stream_id;
     let command_topic_id = command.topic_id;
     let partitioning = command.partitioning;
     let system = state.system.read().await;
-    // TODO(haze): Add confirmation level after testing is complete
+
     system
         .append_messages(
             &Session::stateless(identity.user_id, identity.ip_address),
-            command_stream_id,
-            command_topic_id,
-            partitioning,
-            messages,
+            &command_stream_id,
+            &command_topic_id,
+            &partitioning,
+            batch,
             None,
         )
         .await
@@ -103,8 +105,6 @@ async fn send_messages(
             )
         })?;
     Ok(StatusCode::CREATED)
-    */
-    todo!()
 }
 
 #[instrument(skip_all, name = "trace_flush_unsaved_buffer", 
fields(iggy_user_id = identity.user_id, iggy_stream_id = stream_id, 
iggy_topic_id = topic_id, iggy_partition_id = partition_id, iggy_fsync = 
fsync))]
@@ -127,3 +127,11 @@ async fn flush_unsaved_buffer(
         .await?;
     Ok(StatusCode::OK)
 }
+
+fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut {
+    let (_, indexes, messages) = batch.decompose();
+    let (_, indexes_buffer) = indexes.decompose();
+    let indexes_buffer_mut = BytesMut::from(indexes_buffer);
+    let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut);
+    IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, 
messages.into())
+}
diff --git a/server/src/streaming/cache/buffer.rs 
b/server/src/streaming/cache/buffer.rs
deleted file mode 100644
index 67ebba60..00000000
--- a/server/src/streaming/cache/buffer.rs
+++ /dev/null
@@ -1,177 +0,0 @@
-use crate::streaming::local_sizeable::RealSize;
-
-use super::memory_tracker::CacheMemoryTracker;
-use atone::Vc;
-use iggy::utils::byte_size::IggyByteSize;
-use std::fmt::Debug;
-use std::ops::Index;
-use std::sync::{
-    atomic::{AtomicU64, Ordering},
-    Arc,
-};
-
-#[derive(Debug)]
-pub struct SmartCache<T: RealSize + Debug> {
-    buffer: Vc<T>,
-    memory_tracker: Arc<CacheMemoryTracker>,
-    current_size: IggyByteSize,
-    hits: AtomicU64,
-    misses: AtomicU64,
-}
-
-impl<T> SmartCache<T>
-where
-    T: RealSize + Clone + Debug,
-{
-    pub fn new() -> Self {
-        let current_size = IggyByteSize::default();
-        let buffer = Vc::new();
-        let memory_tracker = CacheMemoryTracker::get_instance().unwrap();
-
-        Self {
-            buffer,
-            memory_tracker,
-            current_size,
-            hits: AtomicU64::new(0),
-            misses: AtomicU64::new(0),
-        }
-    }
-
-    // Used only for cache validation tests
-    #[cfg(test)]
-    pub fn to_vec(&self) -> Vec<T> {
-        let mut vec = Vec::with_capacity(self.buffer.len());
-        vec.extend(self.buffer.iter().cloned());
-        vec
-    }
-
-    /// Pushes an element to the buffer, and if adding the element would 
exceed the memory limit,
-    /// removes the oldest elements until there's enough space for the new 
element.
-    /// It's preferred to use `extend` instead of this method.
-    pub fn push_safe(&mut self, element: T) {
-        let element_size = element.real_size();
-
-        while !self.memory_tracker.will_fit_into_cache(element_size) {
-            if let Some(oldest_element) = self.buffer.pop_front() {
-                let oldest_size = oldest_element.real_size();
-                self.memory_tracker
-                    .decrement_used_memory(oldest_size.as_bytes_u64());
-                self.current_size -= oldest_size;
-            }
-        }
-
-        self.memory_tracker
-            .increment_used_memory(element_size.as_bytes_u64());
-        self.current_size += element_size;
-        self.buffer.push_back(element);
-    }
-
-    /// Removes the oldest elements until there's enough space for the new 
element.
-    pub fn evict_by_size(&mut self, size_to_remove: u64) {
-        let mut removed_size = IggyByteSize::default();
-
-        while let Some(element) = self.buffer.pop_front() {
-            if removed_size >= size_to_remove {
-                break;
-            }
-            let elem_size = element.real_size();
-            self.memory_tracker
-                .decrement_used_memory(elem_size.as_bytes_u64());
-            self.current_size -= elem_size;
-            removed_size += elem_size;
-        }
-    }
-
-    pub fn purge(&mut self) {
-        self.buffer.clear();
-        self.memory_tracker
-            .decrement_used_memory(self.current_size.as_bytes_u64());
-        self.current_size = IggyByteSize::default();
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.buffer.is_empty()
-    }
-
-    pub fn current_size(&self) -> IggyByteSize {
-        self.current_size
-    }
-
-    /// Extends the buffer with the given elements, and always adding the 
elements,
-    /// even if it exceeds the memory limit.
-    pub fn extend(&mut self, elements: impl IntoIterator<Item = T>) {
-        let elements = elements.into_iter().inspect(|element| {
-            let element_size = element.real_size();
-            self.memory_tracker
-                .increment_used_memory(element_size.as_bytes_u64());
-            self.current_size += element_size;
-        });
-        self.buffer.extend(elements);
-    }
-
-    /// Always appends the element into the buffer, even if it exceeds the 
memory limit.
-    pub fn append(&mut self, element: T) {
-        let element_size = element.real_size();
-        self.memory_tracker
-            .increment_used_memory(element_size.as_bytes_u64());
-        self.current_size += element_size;
-        self.buffer.push(element);
-    }
-
-    pub fn iter(&self) -> impl Iterator<Item = &T> {
-        self.buffer.iter()
-    }
-
-    pub fn len(&self) -> usize {
-        self.buffer.len()
-    }
-
-    pub fn get_metrics(&self) -> CacheMetrics {
-        let hits = self.hits.load(Ordering::Relaxed);
-        let misses = self.misses.load(Ordering::Relaxed);
-        let total = hits + misses;
-        let hit_ratio = if total > 0 {
-            hits as f32 / total as f32
-        } else {
-            0.0
-        };
-
-        CacheMetrics {
-            hits,
-            misses,
-            hit_ratio,
-        }
-    }
-
-    pub fn record_hit(&self) {
-        self.hits.fetch_add(1, Ordering::Relaxed);
-    }
-
-    pub fn record_miss(&self) {
-        self.misses.fetch_add(1, Ordering::Relaxed);
-    }
-}
-
-impl<T> Index<usize> for SmartCache<T>
-where
-    T: RealSize + Clone + Debug,
-{
-    type Output = T;
-
-    fn index(&self, index: usize) -> &Self::Output {
-        &self.buffer[index]
-    }
-}
-
-impl<T: RealSize + Clone + Debug> Default for SmartCache<T> {
-    fn default() -> Self {
-        Self::new()
-    }
-}
-
-#[derive(Debug)]
-pub struct CacheMetrics {
-    pub hits: u64,
-    pub misses: u64,
-    pub hit_ratio: f32,
-}
diff --git a/server/src/streaming/cache/memory_tracker.rs 
b/server/src/streaming/cache/memory_tracker.rs
deleted file mode 100644
index 835c44ff..00000000
--- a/server/src/streaming/cache/memory_tracker.rs
+++ /dev/null
@@ -1,101 +0,0 @@
-extern crate sysinfo;
-
-use crate::configs::resource_quota::MemoryResourceQuota;
-use crate::configs::system::CacheConfig;
-use iggy::utils::byte_size::IggyByteSize;
-use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::{Arc, OnceLock};
-use sysinfo::System;
-use tracing::info;
-
-static INSTANCE: OnceLock<Option<Arc<CacheMemoryTracker>>> = OnceLock::new();
-
-#[derive(Debug)]
-pub struct CacheMemoryTracker {
-    used_memory_bytes: AtomicU64,
-    limit_bytes: IggyByteSize,
-}
-
-type MessageSize = u64;
-
-impl CacheMemoryTracker {
-    pub fn initialize(config: &CacheConfig) -> Option<Arc<CacheMemoryTracker>> 
{
-        INSTANCE
-            .get_or_init(|| {
-                if config.enabled {
-                    
Some(Arc::new(CacheMemoryTracker::new(config.size.clone())))
-                } else {
-                    info!("Cache memory tracker disabled");
-                    None
-                }
-            })
-            .clone()
-    }
-
-    pub fn get_instance() -> Option<Arc<CacheMemoryTracker>> {
-        INSTANCE.get().cloned().flatten()
-    }
-
-    fn new(limit: MemoryResourceQuota) -> Self {
-        let mut sys = System::new_all();
-        sys.refresh_all();
-
-        let total_memory_bytes = IggyByteSize::from(sys.total_memory());
-        let free_memory = IggyByteSize::from(sys.free_memory());
-        let free_memory_percentage =
-            free_memory.as_bytes_u64() as f64 / 
total_memory_bytes.as_bytes_u64() as f64 * 100.0;
-        let used_memory_bytes = AtomicU64::new(0);
-        let limit_bytes = limit.into();
-
-        info!(
-            "Cache memory tracker started, cache: {}, total memory: {}, free 
memory: {}, free memory percentage: {:.2}%",
-            limit_bytes.as_human_string(), 
total_memory_bytes.as_human_string(), free_memory, free_memory_percentage
-        );
-
-        CacheMemoryTracker {
-            used_memory_bytes,
-            limit_bytes,
-        }
-    }
-
-    pub fn increment_used_memory(&self, message_size: MessageSize) {
-        let mut current_cache_size_bytes = 
self.used_memory_bytes.load(Ordering::SeqCst);
-        loop {
-            let new_size = current_cache_size_bytes + message_size;
-            match self.used_memory_bytes.compare_exchange_weak(
-                current_cache_size_bytes,
-                new_size,
-                Ordering::SeqCst,
-                Ordering::SeqCst,
-            ) {
-                Ok(_) => break,
-                Err(actual_current) => current_cache_size_bytes = 
actual_current,
-            }
-        }
-    }
-
-    pub fn decrement_used_memory(&self, message_size: MessageSize) {
-        let mut current_cache_size_bytes = 
self.used_memory_bytes.load(Ordering::SeqCst);
-        loop {
-            let new_size = current_cache_size_bytes - message_size;
-            match self.used_memory_bytes.compare_exchange_weak(
-                current_cache_size_bytes,
-                new_size,
-                Ordering::SeqCst,
-                Ordering::SeqCst,
-            ) {
-                Ok(_) => return,
-                Err(actual_current) => current_cache_size_bytes = 
actual_current,
-            }
-        }
-    }
-
-    pub fn usage_bytes(&self) -> IggyByteSize {
-        IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst))
-    }
-
-    pub fn will_fit_into_cache(&self, requested_size: IggyByteSize) -> bool {
-        IggyByteSize::from(self.used_memory_bytes.load(Ordering::SeqCst)) + 
requested_size
-            <= self.limit_bytes
-    }
-}
diff --git a/server/src/streaming/cache/mod.rs 
b/server/src/streaming/cache/mod.rs
deleted file mode 100644
index c055351b..00000000
--- a/server/src/streaming/cache/mod.rs
+++ /dev/null
@@ -1,3 +0,0 @@
-pub mod buffer;
-#[allow(static_mut_refs)]
-pub mod memory_tracker;
diff --git a/server/src/streaming/partitions/storage.rs 
b/server/src/streaming/partitions/storage.rs
index 80c89b43..f11520d7 100644
--- a/server/src/streaming/partitions/storage.rs
+++ b/server/src/streaming/partitions/storage.rs
@@ -42,12 +42,6 @@ impl PartitionStorage for FilePartitionStorage {
         let dir_entries = fs::read_dir(&partition.partition_path).await;
         if fs::read_dir(&partition.partition_path)
                 .await
-                .inspect_err(|err| {
-                    error!(
-                            "Failed to read partition with ID: {} for stream 
with ID: {} and topic with ID: {} and path: {}. Error: {}",
-                            partition.partition_id, partition.stream_id, 
partition.topic_id, partition.partition_path, err
-                        );
-                })
                 .with_error_context(|error| format!(
                     "{COMPONENT} (error: {error}) - failed to read partition 
with ID: {} for stream with ID: {} and topic with ID: {} and path: {}.",
                     partition.partition_id, partition.stream_id, 
partition.topic_id, partition.partition_path,
@@ -132,11 +126,6 @@ impl PartitionStorage for FilePartitionStorage {
             segment.load_from_disk().await.with_error_context(|error| {
                 format!("{COMPONENT} (error: {error}) - failed to load 
segment: {segment}",)
             })?;
-            let capacity = 
partition.config.partition.messages_required_to_save;
-            // TODO(hubcio)
-            // if !segment.is_closed() {
-            //     segment.unsaved_messages = Some(Default::default());
-            // }
 
             // If the first segment has at least a single message, we should 
increment the offset.
             if !partition.should_increment_offset {
@@ -187,9 +176,7 @@ impl PartitionStorage for FilePartitionStorage {
             if end_offset_index == segments_count - 1 {
                 break;
             }
-
-            // TODO(hubcio): fix this
-            // segment.end_offset() = end_offsets[end_offset_index];
+            segment.set_end_offset(end_offsets[end_offset_index]);
         }
 
         if !partition.segments.is_empty() {
diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs 
b/server/src/streaming/segments/indexes/indexes_mut.rs
index 535676d7..91bf98b2 100644
--- a/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -162,7 +162,7 @@ impl IggyIndexesMut {
         result
     }
 
-    /// Clears the container, removing all indexes
+    /// Clears the container, removing all indexes but preserving already 
allocated buffer capacity
     pub fn clear(&mut self) {
         self.saved_count = 0;
         self.buffer.clear();
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index 246b5fe1..9a99bb42 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -174,6 +174,10 @@ impl Segment {
         self.messages_count_of_parent_partition
             .fetch_add(messages_count, Ordering::SeqCst);
 
+        if !self.config.segment.cache_indexes {
+            self.indexes = IggyIndexesMut::empty();
+        }
+
         Ok(())
     }
 
@@ -211,16 +215,15 @@ impl Segment {
     }
 
     pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
+        let messages_reader =
+            MessagesReader::new(&self.messages_path, 
self.messages_size.clone()).await?;
+        self.messages_reader = Some(messages_reader);
+
         if !self.config.segment.cache_indexes {
             let index_reader =
                 IndexReader::new(&self.index_path, 
self.indexes_size.clone()).await?;
             self.index_reader = Some(index_reader);
         }
-
-        let messages_reader =
-            MessagesReader::new(&self.messages_path, 
self.messages_size.clone()).await?;
-        self.messages_reader = Some(messages_reader);
-
         Ok(())
     }
 
@@ -372,6 +375,10 @@ impl Segment {
     pub fn messages_file_path(&self) -> &str {
         &self.messages_path
     }
+
+    pub fn set_end_offset(&mut self, end_offset: u64) {
+        self.end_offset = end_offset;
+    }
 }
 
 impl std::fmt::Display for Segment {
diff --git a/server/src/streaming/segments/types/messages_batch_set.rs 
b/server/src/streaming/segments/types/messages_batch_set.rs
index af52db56..837042c9 100644
--- a/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/server/src/streaming/segments/types/messages_batch_set.rs
@@ -1,7 +1,10 @@
+use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use bytes::Bytes;
 use iggy::models::messaging::IggyMessageView;
 use iggy::models::messaging::IggyMessagesBatch;
 use iggy::prelude::*;
 use std::ops::Index;
+use tracing::trace;
 
 /// A container for multiple IggyMessagesBatch objects
 #[derive(Debug, Clone, Default)]
@@ -111,6 +114,60 @@ impl IggyMessagesBatchSet {
         self.batches.iter()
     }
 
+    /// Convert this batch and poll metadata into a vector of fully-formed 
IggyMessage objects
+    ///
+    /// This method transforms the internal message views into complete 
IggyMessage objects
+    /// that can be returned to clients.
+    ///
+    /// # Arguments
+    ///
+    /// * `poll_metadata` - Metadata about the partition and current offset
+    ///
+    /// # Returns
+    ///
+    /// A vector of IggyMessage objects with proper metadata
+    pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> 
PolledMessages {
+        if self.is_empty() {
+            return PolledMessages {
+                messages: vec![],
+                partition_id: 0,
+                current_offset: 0,
+                count: 0,
+            };
+        }
+
+        let mut messages = Vec::with_capacity(self.count() as usize);
+
+        // TODO(hubcio): this can be also optimized for zero copy, but it's 
http...
+        for batch in self.iter() {
+            for message in batch.iter() {
+                let header = message.header().to_header();
+                let payload = Bytes::copy_from_slice(message.payload());
+                let user_headers = 
message.user_headers().map(Bytes::copy_from_slice);
+                let message = IggyMessage {
+                    header,
+                    payload,
+                    user_headers,
+                };
+                messages.push(message);
+            }
+        }
+
+        trace!(
+            "Converted batch of {} messages from partition {} with current 
offset {}",
+            messages.len(),
+            poll_metadata.partition_id,
+            poll_metadata.current_offset
+        );
+
+        PolledMessages {
+            partition_id: poll_metadata.partition_id,
+            current_offset: poll_metadata.current_offset,
+            count: messages.len() as u32,
+            messages,
+        }
+    }
+
     /// Returns a new IggyMessagesBatch containing only messages with offsets 
greater than or equal to the specified offset,
     /// up to the specified count.
     ///
diff --git a/server/src/streaming/topics/messages.rs 
b/server/src/streaming/topics/messages.rs
index 15b3f932..fb6966f9 100644
--- a/server/src/streaming/topics/messages.rs
+++ b/server/src/streaming/topics/messages.rs
@@ -211,9 +211,10 @@ mod tests {
 
         for entity_id in 1..=messages_count {
             let message = IggyMessage::builder()
-                .with_id(entity_id as u128)
-                .with_payload(Bytes::from(entity_id.to_string()))
-                .build();
+                .id(entity_id as u128)
+                .payload(Bytes::from(entity_id.to_string()))
+                .build()
+                .expect("Failed to create message with valid payload and 
headers");
             let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
             topic
                 .append_messages(&partitioning, messages, None)
@@ -243,10 +244,10 @@ mod tests {
         for entity_id in 1..=messages_count {
             let partitioning = Partitioning::messages_key_u32(entity_id);
             let message = IggyMessage::builder()
-                .with_id(entity_id as u128)
-                .with_payload(Bytes::new())
-                .build();
-            eprintln!("Message: {:#?}", message);
+                .id(entity_id as u128)
+                .payload(Bytes::new())
+                .build()
+                .expect("Failed to create message with valid payload and 
headers");
             let messages = IggyMessagesBatchMut::from_messages(&[message], 1);
             topic
                 .append_messages(&partitioning, messages, None)

Reply via email to