This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit cb7034185f4515d54c35791fe516ff59f8f4a02d
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 10 02:46:12 2025 +0100

    vectored write
---
 cli/src/args/message.rs                            |   7 +-
 integration/tests/mod.rs                           |   6 +-
 integration/tests/server/mod.rs                    |   8 +-
 integration/tests/streaming/get_by_offset.rs       | 118 ++++-----
 integration/tests/streaming/mod.rs                 |  33 ++-
 sdk/src/bytes_serializable.rs                      |  12 +-
 sdk/src/cli/message/poll_messages.rs               |   4 +-
 sdk/src/cli/message/send_messages.rs               |   5 +-
 sdk/src/identifier.rs                              |  10 +
 sdk/src/messages/partitioning.rs                   |  10 +
 sdk/src/messages/polling_kind.rs                   |   9 -
 sdk/src/messages/send_messages.rs                  | 211 ++++++++--------
 sdk/src/models/{ => messaging}/header.rs           |   0
 sdk/src/models/messaging/message.rs                |  33 +--
 sdk/src/models/messaging/message_header.rs         |   1 +
 sdk/src/models/messaging/message_header_view.rs    |  13 +-
 sdk/src/models/messaging/message_view.rs           |  64 ++---
 sdk/src/models/messaging/messages.rs               |  73 ++++--
 sdk/src/models/messaging/mod.rs                    |   4 +-
 sdk/src/models/mod.rs                              |   1 -
 sdk/src/prelude.rs                                 |   5 +-
 .../handlers/messages/poll_messages_handler.rs     |  10 +-
 .../handlers/messages/send_messages_handler.rs     |   3 -
 server/src/streaming/partitions/messages.rs        |  90 ++++---
 server/src/streaming/partitions/partition.rs       |  35 +--
 server/src/streaming/segments/indexes/index.rs     | 193 ---------------
 .../src/streaming/segments/indexes/index_reader.rs | 133 ++++------
 .../src/streaming/segments/indexes/index_writer.rs |  24 --
 server/src/streaming/segments/indexes/mod.rs       |   1 -
 .../streaming/segments/messages/messages_reader.rs |  16 +-
 .../streaming/segments/messages/messages_writer.rs |  23 +-
 server/src/streaming/segments/messages/mod.rs      |  42 ++++
 .../streaming/segments/messages/persister_task.rs  |  80 +++---
 .../segments/messages_accumulator copy 2.rs        | 141 -----------
 .../segments/messages_accumulator copy.rs          | 106 --------
 .../src/streaming/segments/messages_accumulator.rs |  91 ++++---
 server/src/streaming/segments/mod.rs               |   1 +
 server/src/streaming/segments/reading_messages.rs  | 273 +++++++++------------
 .../segments/types/message_header_view_mut.rs      |   5 +-
 .../streaming/segments/types/message_view_mut.rs   |  24 +-
 .../src/streaming/segments/types/messages_mut.rs   |  32 +--
 .../src/streaming/segments/types/messages_slice.rs |  70 ++++++
 .../segments/types/messages_slice_better.rs        |   1 +
 server/src/streaming/segments/types/mod.rs         |   3 +
 server/src/streaming/segments/writing_messages.rs  |   4 +-
 server/src/streaming/systems/messages.rs           |  89 +++----
 server/src/streaming/systems/stats.rs              |  19 +-
 server/src/streaming/topics/messages.rs            |   4 +-
 48 files changed, 881 insertions(+), 1259 deletions(-)

diff --git a/cli/src/args/message.rs b/cli/src/args/message.rs
index d334de05..917e2873 100644
--- a/cli/src/args/message.rs
+++ b/cli/src/args/message.rs
@@ -1,9 +1,6 @@
 use clap::builder::NonEmptyStringValueParser;
 use clap::{ArgGroup, Args, Subcommand};
-use iggy::error::IggyError;
-use iggy::error::IggyError::InvalidFormat;
-use iggy::identifier::Identifier;
-use iggy::models::header::{HeaderKey, HeaderValue};
+use iggy::prelude::*;
 use std::str::FromStr;
 
 #[derive(Debug, Clone, Subcommand)]
@@ -109,7 +106,7 @@ fn parse_key_val(s: &str) -> Result<(HeaderKey, 
HeaderValue), IggyError> {
     let parts = lower.split(':').collect::<Vec<_>>();
 
     if parts.len() != 3 {
-        Err(InvalidFormat)?;
+        return Err(IggyError::InvalidFormat);
     }
 
     let key = HeaderKey::from_str(parts[0])?;
diff --git a/integration/tests/mod.rs b/integration/tests/mod.rs
index dd153591..56fb66b4 100644
--- a/integration/tests/mod.rs
+++ b/integration/tests/mod.rs
@@ -9,12 +9,12 @@ use std::{panic, thread};
 
 mod archiver;
 mod bench;
-mod cli;
+// mod cli;
 mod config_provider;
 mod data_integrity;
 mod examples;
 mod server;
-mod state;
+// mod state;
 mod streaming;
 
 lazy_static! {
@@ -32,7 +32,7 @@ fn setup() {
 
     let mut logger = env_logger::builder();
     logger.is_test(true);
-    logger.filter(None, log::LevelFilter::Debug);
+    logger.filter(None, log::LevelFilter::Trace);
     logger.target(env_logger::Target::Pipe(Box::new(LogWriter(log_buffer))));
     logger.format(move |buf, record| {
         let timestamp = chrono::Local::now().format("%Y-%m-%d %H:%M:%S%.6f");
diff --git a/integration/tests/server/mod.rs b/integration/tests/server/mod.rs
index e049a0d3..6d0f60c1 100644
--- a/integration/tests/server/mod.rs
+++ b/integration/tests/server/mod.rs
@@ -1,4 +1,4 @@
-mod http_server;
-mod quic_server;
-mod scenarios;
-mod tcp_server;
+// mod http_server;
+// mod quic_server;
+// mod scenarios;
+// mod tcp_server;
diff --git a/integration/tests/streaming/get_by_offset.rs 
b/integration/tests/streaming/get_by_offset.rs
index 7bafca7b..bfaec404 100644
--- a/integration/tests/streaming/get_by_offset.rs
+++ b/integration/tests/streaming/get_by_offset.rs
@@ -1,16 +1,10 @@
 use crate::streaming::common::test_setup::TestSetup;
-use bytes::BytesMut;
-use iggy::bytes_serializable::BytesSerializable;
-use iggy::messages::send_messages::Message;
-use iggy::models::header::{HeaderKey, HeaderValue};
-use iggy::utils::byte_size::IggyByteSize;
-use iggy::utils::expiry::IggyExpiry;
-use iggy::utils::sizeable::Sizeable;
-use iggy::utils::timestamp::IggyTimestamp;
+use bytes::{Bytes, BytesMut};
+use iggy::prelude::*;
 use server::configs::resource_quota::MemoryResourceQuota;
 use server::configs::system::{CacheConfig, PartitionConfig, SegmentConfig, 
SystemConfig};
-use server::streaming::batching::appendable_batch_info::AppendableBatchInfo;
 use server::streaming::partitions::partition::Partition;
+use server::streaming::segments::IggyMessagesMut;
 use std::collections::HashMap;
 use std::str::FromStr;
 use std::sync::atomic::{AtomicU32, AtomicU64};
@@ -144,12 +138,11 @@ async fn test_get_messages_by_offset(
             HeaderKey::new("key-3").unwrap(),
             HeaderValue::from_uint64(123456).unwrap(),
         );
-        let message = Message {
-            id,
-            length: payload.len() as u32,
-            payload: payload.clone(),
-            headers: Some(headers),
-        };
+        let message = IggyMessage::builder()
+            .id(id)
+            .payload(payload)
+            .headers(headers)
+            .build();
         all_messages.push(message);
     }
 
@@ -159,18 +152,11 @@ async fn test_get_messages_by_offset(
 
     // Append all batches
     for batch_size in batch_sizes {
-        let batch = all_messages[current_pos..current_pos + batch_size as 
usize].to_vec();
-        let batch_info = AppendableBatchInfo::new(
-            batch
-                .iter()
-                .map(|msg| msg.get_size_bytes())
-                .sum::<IggyByteSize>(),
-            partition.partition_id,
-        );
-        partition
-            .append_messages(batch_info, batch.clone(), None)
-            .await
-            .unwrap();
+        println!("batch_size: {}, current_pos: {}", batch_size, current_pos);
+        let batch =
+            IggyMessagesMut::from(&all_messages[current_pos..current_pos + 
batch_size as usize]);
+        assert_eq!(batch.count(), batch_size);
+        partition.append_messages(batch, None).await.unwrap();
 
         batch_offsets.push(partition.current_offset);
         current_pos += batch_size as usize;
@@ -182,11 +168,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        all_loaded_messages.len(),
-        total_messages as usize,
+        all_loaded_messages.count(),
+        total_messages,
         "Expected {} messages from start, but got {}",
         total_messages,
-        all_loaded_messages.len()
+        all_loaded_messages.count()
     );
 
     // Test 2: Get messages from middle (after 3rd batch)
@@ -197,11 +183,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        middle_messages.len(),
-        remaining_messages as usize,
+        middle_messages.count(),
+        remaining_messages,
         "Expected {} messages from middle offset, but got {}",
         remaining_messages,
-        middle_messages.len()
+        middle_messages.count()
     );
 
     // Test 3: No messages beyond final offset
@@ -211,10 +197,10 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        no_messages.len(),
+        no_messages.count(),
         0,
         "Expected no messages beyond final offset, but got {}",
-        no_messages.len()
+        no_messages.count()
     );
 
     // Test 4: Small subset from start
@@ -224,63 +210,77 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        subset_messages.len(),
-        subset_size as usize,
+        subset_messages.count(),
+        subset_size,
         "Expected {} messages in subset from start, but got {}",
         subset_size,
-        subset_messages.len()
+        subset_messages.count()
     );
 
     // Test 5: Messages spanning multiple batches
     let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch
     let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
-    let spanning_messages = partition
+    let messages = partition
         .get_messages_by_offset(span_offset, span_size)
         .await
         .unwrap();
     assert_eq!(
-        spanning_messages.len(),
-        span_size as usize,
+        messages.count(),
+        span_size,
         "Expected {} messages spanning multiple batches, but got {}",
         span_size,
-        spanning_messages.len()
+        messages.count()
     );
 
     // Test 6: Validate message content and ordering
-    for (i, msg) in spanning_messages.iter().enumerate() {
+    // Convert to a Vec of IggyMessage to avoid lifetime issues
+    let message_vec = messages.to_messages();
+
+    for (i, msg) in message_vec.iter().enumerate() {
         let expected_offset = span_offset + i as u64;
         assert!(
-            msg.offset >= expected_offset,
+            msg.msg_header().offset() >= expected_offset,
             "Message offset {} at position {} should be >= expected offset {}",
-            msg.offset,
+            msg.msg_header().offset(),
             i,
             expected_offset
         );
 
         // Verify message contents match original
-        let original_idx = msg.offset as usize;
+        let original_idx = msg.msg_header().offset() as usize;
+
         let original_message = &all_messages[original_idx];
+        let original_msg_header = &original_message.header;
+        let original_headers = 
&original_message.headers.as_ref().unwrap().to_bytes();
+        let original_payload = &original_message.payload;
+
+        let msg_header = msg.msg_header();
+        let msg_payload = msg.payload();
+        let msg_headers = msg.headers();
+
         assert_eq!(
-            msg.id, original_message.id,
+            msg_header.id(),
+            original_msg_header.id,
             "Message ID mismatch at offset {}: expected {}, got {}",
-            msg.offset, original_message.id, msg.id
+            msg_header.offset(),
+            original_msg_header.id,
+            msg.msg_header().id()
         );
         assert_eq!(
-            msg.payload, original_message.payload,
+            msg_payload,
+            original_payload,
             "Payload mismatch at offset {}: expected {:?}, got {:?}",
-            msg.offset, original_message.payload, msg.payload
+            msg_header.offset(),
+            original_payload,
+            msg_payload
         );
         assert_eq!(
-            msg.headers
-                .as_ref()
-                .map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap()),
-            original_message.headers,
+            &msg_headers,
+            &original_headers,
             "Headers mismatch at offset {}: expected {:?}, got {:?}",
-            msg.offset,
-            original_message.headers,
-            msg.headers
-                .as_ref()
-                .map(|bytes| HashMap::from_bytes(bytes.clone()).unwrap())
+            msg_header.offset(),
+            original_headers,
+            msg_headers
         );
     }
 }
diff --git a/integration/tests/streaming/mod.rs 
b/integration/tests/streaming/mod.rs
index d2ad7ec3..194a1782 100644
--- a/integration/tests/streaming/mod.rs
+++ b/integration/tests/streaming/mod.rs
@@ -1,20 +1,20 @@
 use bytes::Bytes;
-use iggy::messages::send_messages::Message;
+use iggy::prelude::IggyMessage;
 
 mod common;
-mod consumer_offset;
+// mod consumer_offset;
 mod get_by_offset;
-mod get_by_timestamp;
-mod messages;
-mod partition;
-mod segment;
-mod snapshot;
-mod stream;
-mod system;
-mod topic;
-mod topic_messages;
+// mod get_by_timestamp;
+// mod messages;
+// mod partition;
+// mod segment;
+// mod snapshot;
+// mod stream;
+// mod system;
+// mod topic;
+// mod topic_messages;
 
-fn create_messages() -> Vec<Message> {
+fn create_messages() -> Vec<IggyMessage> {
     vec![
         create_message(1, "message 1"),
         create_message(2, "message 2"),
@@ -25,12 +25,7 @@ fn create_messages() -> Vec<Message> {
     ]
 }
 
-fn create_message(id: u128, payload: &str) -> Message {
+fn create_message(id: u128, payload: &str) -> IggyMessage {
     let payload = Bytes::from(payload.to_string());
-    Message {
-        id,
-        length: payload.len() as u32,
-        payload,
-        headers: None,
-    }
+    IggyMessage::builder().id(id).payload(payload).build()
 }
diff --git a/sdk/src/bytes_serializable.rs b/sdk/src/bytes_serializable.rs
index 0ba88326..e3158721 100644
--- a/sdk/src/bytes_serializable.rs
+++ b/sdk/src/bytes_serializable.rs
@@ -1,5 +1,5 @@
 use crate::error::IggyError;
-use bytes::Bytes;
+use bytes::{Bytes, BytesMut};
 
 /// The trait represents the logic responsible for serializing and 
deserializing the struct to and from bytes.
 pub trait BytesSerializable {
@@ -10,4 +10,14 @@ pub trait BytesSerializable {
     fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
     where
         Self: Sized;
+
+    /// Write the struct to a buffer.
+    fn write_to_buffer(&self, _buf: &mut BytesMut) {
+        unimplemented!();
+    }
+
+    /// Get the byte-size of the struct.
+    fn get_buffer_size(&self) -> u32 {
+        unimplemented!();
+    }
 }
diff --git a/sdk/src/cli/message/poll_messages.rs 
b/sdk/src/cli/message/poll_messages.rs
index a42e4a0b..eae02afc 100644
--- a/sdk/src/cli/message/poll_messages.rs
+++ b/sdk/src/cli/message/poll_messages.rs
@@ -4,8 +4,8 @@ use crate::client::Client;
 use crate::consumer::Consumer;
 use crate::identifier::Identifier;
 use crate::messages::{PollMessages, PollingStrategy};
-use crate::models::header::{HeaderKey, HeaderKind};
-use crate::prelude::IggyMessage;
+use crate::models::messaging::HeaderKind;
+use crate::prelude::{HeaderKey, IggyMessage};
 use crate::utils::sizeable::Sizeable;
 use crate::utils::timestamp::IggyTimestamp;
 use crate::utils::{byte_size::IggyByteSize, duration::IggyDuration};
diff --git a/sdk/src/cli/message/send_messages.rs 
b/sdk/src/cli/message/send_messages.rs
index 8860cf03..ca91b3c5 100644
--- a/sdk/src/cli/message/send_messages.rs
+++ b/sdk/src/cli/message/send_messages.rs
@@ -2,9 +2,8 @@ use crate::bytes_serializable::BytesSerializable;
 use crate::cli_command::{CliCommand, PRINT_TARGET};
 use crate::client::Client;
 use crate::identifier::Identifier;
-use crate::models::header::{HeaderKey, HeaderValue};
-use crate::prelude::IggyMessage;
-use crate::prelude::Partitioning;
+use crate::prelude::{HeaderKey, IggyMessage};
+use crate::prelude::{HeaderValue, Partitioning};
 use crate::utils::sizeable::Sizeable;
 use anyhow::Context;
 use async_trait::async_trait;
diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs
index b6abadc4..e813c90d 100644
--- a/sdk/src/identifier.rs
+++ b/sdk/src/identifier.rs
@@ -222,6 +222,16 @@ impl BytesSerializable for Identifier {
         identifier.validate()?;
         Ok(identifier)
     }
+
+    fn write_to_buffer(&self, bytes: &mut BytesMut) {
+        bytes.put_u8(self.kind.as_code());
+        bytes.put_u8(self.length);
+        bytes.put_slice(&self.value);
+    }
+
+    fn get_buffer_size(&self) -> u32 {
+        2 + self.length as u32
+    }
 }
 
 impl IdKind {
diff --git a/sdk/src/messages/partitioning.rs b/sdk/src/messages/partitioning.rs
index 74a8924f..892095c0 100644
--- a/sdk/src/messages/partitioning.rs
+++ b/sdk/src/messages/partitioning.rs
@@ -189,4 +189,14 @@ impl BytesSerializable for Partitioning {
             value,
         })
     }
+
+    fn write_to_buffer(&self, bytes: &mut BytesMut) {
+        bytes.put_u8(self.kind.as_code());
+        bytes.put_u8(self.length);
+        bytes.put_slice(&self.value);
+    }
+
+    fn get_buffer_size(&self) -> u32 {
+        2 + self.length as u32
+    }
 }
diff --git a/sdk/src/messages/polling_kind.rs b/sdk/src/messages/polling_kind.rs
index edb6d7f5..2ee61d8a 100644
--- a/sdk/src/messages/polling_kind.rs
+++ b/sdk/src/messages/polling_kind.rs
@@ -1,14 +1,5 @@
-use crate::bytes_serializable::BytesSerializable;
-use crate::command::{Command, POLL_MESSAGES_CODE};
-use crate::consumer::{Consumer, ConsumerKind};
 use crate::error::IggyError;
-use crate::identifier::Identifier;
-use crate::utils::sizeable::Sizeable;
-use crate::utils::timestamp::IggyTimestamp;
-use crate::validatable::Validatable;
-use bytes::{BufMut, Bytes, BytesMut};
 use serde::{Deserialize, Serialize};
-use serde_with::{serde_as, DisplayFromStr};
 use std::fmt::Display;
 use std::str::FromStr;
 
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 139c4aac..04dd294c 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -4,8 +4,9 @@ use crate::command::{Command, SEND_MESSAGES_CODE};
 use crate::error::IggyError;
 use crate::identifier::Identifier;
 use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE};
-use crate::models::header::{HeaderKey, HeaderValue};
+use crate::models::messaging::{HeaderKey, HeaderValue};
 use crate::models::messaging::{IggyMessage, IggyMessageHeader, 
IggyMessageViewIterator};
+use crate::prelude::IGGY_MESSAGE_HEADER_SIZE;
 use crate::utils::byte_size::IggyByteSize;
 use crate::utils::sizeable::Sizeable;
 use crate::utils::varint::IggyVarInt;
@@ -49,32 +50,42 @@ impl SendMessages {
         partitioning: &Partitioning,
         messages: &[IggyMessage],
     ) -> Bytes {
-        let stream_id_bytes = stream_id.to_bytes();
-        let topic_id_bytes = topic_id.to_bytes();
-        let partitioning_bytes = partitioning.to_bytes();
+        let stream_id_size = stream_id.get_buffer_size();
+        let topic_id_size = topic_id.get_buffer_size();
+        let partitioning_size = partitioning.get_buffer_size();
         let messages_count = messages.len() as u32;
 
-        let total_size = stream_id_bytes.len()
-        + topic_id_bytes.len()
-        + partitioning_bytes.len()
+        let total_size = stream_id_size
+        + topic_id_size
+        + partitioning_size
         + 4  // For messages_count (u32)
         + messages
             .iter()
-            .map(|m| m.get_size_bytes().as_bytes_usize())
-            .sum::<usize>();
+            .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
+            .sum::<u32>();
 
-        let mut bytes = BytesMut::with_capacity(total_size);
+        let mut bytes = BytesMut::with_capacity(total_size as usize);
 
-        bytes.put_slice(&stream_id_bytes);
-        bytes.put_slice(&topic_id_bytes);
-        bytes.put_slice(&partitioning_bytes);
+        stream_id.write_to_buffer(&mut bytes);
+        topic_id.write_to_buffer(&mut bytes);
+        partitioning.write_to_buffer(&mut bytes);
         bytes.put_u32_le(messages_count);
 
         for message in messages {
-            message.write_to_bytes_mut(&mut bytes);
+            message.write_to_buffer(&mut bytes);
         }
 
-        bytes.freeze()
+        let result = bytes.freeze();
+
+        debug_assert_eq!(
+            total_size as usize,
+            result.len(),
+            "Calculated size ({}) doesn't match actual bytes length ({})",
+            total_size,
+            result.len()
+        );
+
+        result
     }
 }
 
@@ -109,37 +120,30 @@ impl Validatable<IggyError> for SendMessages {
             return Err(IggyError::InvalidKeyValueLength);
         }
 
-        let mut headers_size = 0;
         let mut payload_size = 0;
         let mut message_count = 0;
 
-        for message_result in IggyMessageViewIterator::new(&self.messages) {
-            match message_result {
-                Ok(message_view) => {
-                    message_count += 1;
-
-                    // // TODO: fix this
-                    // if let Ok(Some(headers)) = message_view.headers() {
-                    //     for value in headers.values() {
-                    //         headers_size += value.value.len() as u32;
-                    //         if headers_size > MAX_HEADERS_SIZE {
-                    //             return Err(IggyError::TooBigHeadersPayload);
-                    //         }
-                    //     }
-                    // }
-
-                    let message_payload = message_view.payload();
-                    payload_size += message_payload.len() as u32;
-                    if payload_size > MAX_PAYLOAD_SIZE {
-                        return Err(IggyError::TooBigMessagePayload);
-                    }
-
-                    // todo(hubcio): make it use IGGY_MESSAGE_HEADER_SIZE
-                    if message_payload.len() < 56 {
-                        return Err(IggyError::InvalidMessagePayloadLength);
-                    }
-                }
-                Err(e) => return Err(e),
+        for message in IggyMessageViewIterator::new(&self.messages) {
+            message_count += 1;
+
+            // TODO(hubcio): IMHO validation of headers should be purely on 
SDK side
+            // if let Some(headers) = message.headers() {
+            //     for value in headers.values() {
+            //         headers_size += value.value.len() as u32;
+            //         if headers_size > MAX_HEADERS_SIZE {
+            //             return Err(IggyError::TooBigHeadersPayload);
+            //         }
+            //     }
+            // }
+
+            let message_payload = message.payload();
+            payload_size += message_payload.len() as u32;
+            if payload_size > MAX_PAYLOAD_SIZE {
+                return Err(IggyError::TooBigMessagePayload);
+            }
+
+            if message_payload.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
+                return Err(IggyError::InvalidMessagePayloadLength);
             }
         }
 
@@ -157,73 +161,76 @@ impl Validatable<IggyError> for SendMessages {
 
 impl BytesSerializable for SendMessages {
     fn to_bytes(&self) -> Bytes {
-        let stream_id_bytes = self.stream_id.to_bytes();
-        let topic_id_bytes = self.topic_id.to_bytes();
-        let partitioning_bytes = self.partitioning.to_bytes();
+        panic!("should not be used")
 
-        let metadata_len = stream_id_bytes.len()
-            + topic_id_bytes.len()
-            + partitioning_bytes.len()
-            + std::mem::size_of::<u32>();
+        // let stream_id_bytes = self.stream_id.to_bytes();
+        // let topic_id_bytes = self.topic_id.to_bytes();
+        // let partitioning_bytes = self.partitioning.to_bytes();
 
-        let total_len = metadata_len + self.messages.len();
+        // let metadata_len = stream_id_bytes.len()
+        //     + topic_id_bytes.len()
+        //     + partitioning_bytes.len()
+        //     + std::mem::size_of::<u32>();
 
-        let mut bytes = BytesMut::with_capacity(total_len);
+        // let total_len = metadata_len + self.messages.len();
 
-        bytes.put_slice(&stream_id_bytes);
-        bytes.put_slice(&topic_id_bytes);
-        bytes.put_slice(&partitioning_bytes);
-        bytes.put_u32_le(self.messages_count);
-        bytes.put_slice(&self.messages);
+        // let mut bytes = BytesMut::with_capacity(total_len);
 
-        bytes.freeze()
+        // bytes.put_slice(&stream_id_bytes);
+        // bytes.put_slice(&topic_id_bytes);
+        // bytes.put_slice(&partitioning_bytes);
+        // bytes.put_u32_le(self.messages_count);
+        // bytes.put_slice(&self.messages);
+
+        // bytes.freeze()
     }
 
     fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> {
-        if bytes.is_empty() {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let mut position = 0;
-        let stream_id = Identifier::from_bytes(bytes.clone())?;
-        position += stream_id.get_size_bytes().as_bytes_usize();
-
-        if bytes.len() <= position {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
-        position += topic_id.get_size_bytes().as_bytes_usize();
-
-        if bytes.len() <= position {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let partitioning = Partitioning::from_bytes(bytes.slice(position..))?;
-        position += partitioning.get_size_bytes().as_bytes_usize();
-
-        if bytes.len() < position + 4 {
-            return Err(IggyError::InvalidCommand);
-        }
-
-        let messages_count = u32::from_le_bytes(
-            bytes
-                .slice(position..position + 4)
-                .as_ref()
-                .try_into()
-                .unwrap(),
-        );
-        position += 4;
-
-        let messages = bytes.slice(position..);
-
-        Ok(SendMessages {
-            stream_id,
-            topic_id,
-            partitioning,
-            messages_count,
-            messages,
-        })
+        panic!("should not be used")
+        // if bytes.is_empty() {
+        //     return Err(IggyError::InvalidCommand);
+        // }
+
+        // let mut position = 0;
+        // let stream_id = Identifier::from_bytes(bytes.clone())?;
+        // position += stream_id.get_size_bytes().as_bytes_usize();
+
+        // if bytes.len() <= position {
+        //     return Err(IggyError::InvalidCommand);
+        // }
+
+        // let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
+        // position += topic_id.get_size_bytes().as_bytes_usize();
+
+        // if bytes.len() <= position {
+        //     return Err(IggyError::InvalidCommand);
+        // }
+
+        // let partitioning = 
Partitioning::from_bytes(bytes.slice(position..))?;
+        // position += partitioning.get_size_bytes().as_bytes_usize();
+
+        // if bytes.len() < position + 4 {
+        //     return Err(IggyError::InvalidCommand);
+        // }
+
+        // let messages_count = u32::from_le_bytes(
+        //     bytes
+        //         .slice(position..position + 4)
+        //         .as_ref()
+        //         .try_into()
+        //         .unwrap(),
+        // );
+        // position += 4;
+
+        // let messages = bytes.slice(position..);
+
+        // Ok(SendMessages {
+        //     stream_id,
+        //     topic_id,
+        //     partitioning,
+        //     messages_count,
+        //     messages,
+        // })
     }
 }
 
diff --git a/sdk/src/models/header.rs b/sdk/src/models/messaging/header.rs
similarity index 100%
rename from sdk/src/models/header.rs
rename to sdk/src/models/messaging/header.rs
diff --git a/sdk/src/models/messaging/message.rs 
b/sdk/src/models/messaging/message.rs
index 662cefb5..d60344fe 100644
--- a/sdk/src/models/messaging/message.rs
+++ b/sdk/src/models/messaging/message.rs
@@ -1,8 +1,8 @@
+use super::header::get_headers_size_bytes;
 use super::message_header::{IggyMessageHeader, IGGY_MESSAGE_HEADER_SIZE};
 use crate::bytes_serializable::BytesSerializable;
 use crate::error::IggyError;
-use crate::models::header;
-use crate::models::header::{HeaderKey, HeaderValue};
+use crate::models::messaging::header::{HeaderKey, HeaderValue};
 use crate::utils::byte_size::IggyByteSize;
 use crate::utils::sizeable::Sizeable;
 use crate::utils::timestamp::IggyTimestamp;
@@ -37,15 +37,6 @@ impl IggyMessage {
     pub fn builder() -> IggyMessageBuilder {
         IggyMessageBuilder::new()
     }
-
-    /// Write message to bytes mut
-    pub fn write_to_bytes_mut(&self, buf: &mut BytesMut) {
-        buf.put_slice(&self.header.to_bytes());
-        buf.put_slice(&self.payload);
-        if let Some(headers) = &self.headers {
-            buf.put_slice(&headers.to_bytes());
-        }
-    }
 }
 
 impl FromStr for IggyMessage {
@@ -83,7 +74,7 @@ impl std::fmt::Display for IggyMessage {
 impl Sizeable for IggyMessage {
     fn get_size_bytes(&self) -> IggyByteSize {
         let payload_len = IggyByteSize::from(self.payload.len() as u64);
-        let headers_len = header::get_headers_size_bytes(&self.headers);
+        let headers_len = get_headers_size_bytes(&self.headers);
         let message_header_len = IggyByteSize::from(IGGY_MESSAGE_HEADER_SIZE 
as u64);
 
         payload_len + headers_len + message_header_len
@@ -131,6 +122,15 @@ impl BytesSerializable for IggyMessage {
             headers,
         })
     }
+
+    /// Write message to bytes mut
+    fn write_to_buffer(&self, buf: &mut BytesMut) {
+        buf.put_slice(&self.header.to_bytes());
+        buf.put_slice(&self.payload);
+        if let Some(headers) = &self.headers {
+            buf.put_slice(&headers.to_bytes());
+        }
+    }
 }
 
 #[derive(Debug, Default)]
@@ -172,20 +172,21 @@ impl IggyMessageBuilder {
 
     pub fn build(self) -> IggyMessage {
         let payload = self.payload.unwrap_or_default();
-        let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().as_u128());
+        let id = self.id.unwrap_or(0);
+        let headers_length = 
get_headers_size_bytes(&self.headers).as_bytes_u64() as u32;
 
-        let header = IggyMessageHeader {
+        let msg_header = IggyMessageHeader {
             checksum: 0, // Checksum is calculated on server side
             id,
             offset: 0,
             timestamp: 0,
             origin_timestamp: IggyTimestamp::now().as_micros(),
-            headers_length: 0,
+            headers_length,
             payload_length: payload.len() as u32,
         };
 
         IggyMessage {
-            header,
+            header: msg_header,
             payload,
             headers: self.headers,
         }
diff --git a/sdk/src/models/messaging/message_header.rs 
b/sdk/src/models/messaging/message_header.rs
index adc2ff85..807bd371 100644
--- a/sdk/src/models/messaging/message_header.rs
+++ b/sdk/src/models/messaging/message_header.rs
@@ -7,6 +7,7 @@ use bytes::{BufMut, Bytes, BytesMut};
 use std::ops::Range;
 
 pub const IGGY_MESSAGE_HEADER_SIZE: u32 = 8 + 16 + 8 + 8 + 8 + 4 + 4;
+pub const IGGY_MESSAGE_HEADER_RANGE: Range<usize> = 
0..(IGGY_MESSAGE_HEADER_SIZE as usize);
 
 pub const IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE: Range<usize> = 0..8;
 pub const IGGY_MESSAGE_ID_OFFSET_RANGE: Range<usize> = 8..24;
diff --git a/sdk/src/models/messaging/message_header_view.rs 
b/sdk/src/models/messaging/message_header_view.rs
index 21a9f753..3846d450 100644
--- a/sdk/src/models/messaging/message_header_view.rs
+++ b/sdk/src/models/messaging/message_header_view.rs
@@ -1,5 +1,4 @@
 use super::message_header::*;
-use crate::error::IggyError;
 
 /// A read-only, typed view into a message header in a raw buffer.
 ///
@@ -14,11 +13,13 @@ impl<'a> IggyMessageHeaderView<'a> {
     /// Creates a new `IggyMessageHeaderView` over `data`.
     ///
     /// Returns an error if `data.len() < IGGY_MESSAGE_HEADER_SIZE`.
-    pub fn new(data: &'a [u8]) -> Result<Self, IggyError> {
-        if data.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
-            return Err(IggyError::InvalidCommand);
-        }
-        Ok(Self { data })
+    pub fn new(data: &'a [u8]) -> Self {
+        debug_assert!(
+            data.len() >= IGGY_MESSAGE_HEADER_SIZE as usize,
+            "Header view requires at least {} bytes",
+            IGGY_MESSAGE_HEADER_SIZE
+        );
+        Self { data }
     }
 
     /// The stored checksum at the start of the header
diff --git a/sdk/src/models/messaging/message_view.rs 
b/sdk/src/models/messaging/message_view.rs
index 4acd326b..40b20f77 100644
--- a/sdk/src/models/messaging/message_view.rs
+++ b/sdk/src/models/messaging/message_view.rs
@@ -1,6 +1,10 @@
 use super::message_header::*;
 use super::IggyMessageHeaderView;
 use crate::error::IggyError;
+use crate::models::messaging::header::HeaderKey;
+use crate::models::messaging::header::HeaderValue;
+use crate::prelude::BytesSerializable;
+use std::collections::HashMap;
 use std::iter::Iterator;
 
 /// A immutable view of a message.
@@ -13,30 +17,21 @@ pub struct IggyMessageView<'a> {
 
 impl<'a> IggyMessageView<'a> {
     /// Creates a new immutable message view from a buffer.
-    pub fn new(buffer: &'a [u8]) -> Result<Self, IggyError> {
-        if buffer.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
-            return Err(IggyError::InvalidCommand);
-        }
-        // Create a header view from the header slice.
-        let header_view =
-            IggyMessageHeaderView::new(&buffer[0..IGGY_MESSAGE_HEADER_SIZE as 
usize])?;
+    pub fn new(buffer: &'a [u8]) -> Self {
+        let header_view = 
IggyMessageHeaderView::new(&buffer[IGGY_MESSAGE_HEADER_RANGE]);
         let payload_len = header_view.payload_length() as usize;
-        let headers_len = header_view.headers_length() as usize;
-        let total_size = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len + 
headers_len;
+        let payload_offset = IGGY_MESSAGE_HEADER_SIZE as usize;
+        let headers_offset = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len;
 
-        if buffer.len() < total_size {
-            return Err(IggyError::InvalidMessagePayloadLength);
-        }
-
-        Ok(Self {
+        Self {
             buffer,
-            payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize,
-            headers_offset: IGGY_MESSAGE_HEADER_SIZE as usize + payload_len,
-        })
+            payload_offset,
+            headers_offset,
+        }
     }
 
     /// Returns an immutable header view.
-    pub fn msg_header(&self) -> Result<IggyMessageHeaderView<'_>, IggyError> {
+    pub fn msg_header(&self) -> IggyMessageHeaderView<'_> {
         IggyMessageHeaderView::new(&self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as 
usize])
     }
 
@@ -47,7 +42,7 @@ impl<'a> IggyMessageView<'a> {
 
     /// Returns the size of the entire message.
     pub fn size(&self) -> usize {
-        let header_view = self.msg_header().expect("header must be valid");
+        let header_view = self.msg_header();
         IGGY_MESSAGE_HEADER_SIZE as usize
             + header_view.payload_length() as usize
             + header_view.headers_length() as usize
@@ -55,10 +50,27 @@ impl<'a> IggyMessageView<'a> {
 
     /// Returns a reference to the payload portion.
     pub fn payload(&self) -> &[u8] {
-        let header_view = self.msg_header().expect("header must be valid");
+        let header_view = self.msg_header();
         let payload_len = header_view.payload_length() as usize;
         &self.buffer[self.payload_offset..self.payload_offset + payload_len]
     }
+
+    /// Validates that the message view is properly formatted and has valid 
data.
+    pub fn validate(&self) -> Result<(), IggyError> {
+        if self.buffer.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
+            return Err(IggyError::InvalidMessagePayloadLength);
+        }
+
+        let header = self.msg_header();
+        let payload_len = header.payload_length() as usize;
+        let headers_len = header.headers_length() as usize;
+        let total_size = IGGY_MESSAGE_HEADER_SIZE as usize + payload_len + 
headers_len;
+
+        if self.buffer.len() < total_size {
+            return Err(IggyError::InvalidMessagePayloadLength);
+        }
+        Ok(())
+    }
 }
 
 /// Iterator over immutable message views in a buffer.
@@ -77,7 +89,7 @@ impl<'a> IggyMessageViewIterator<'a> {
 }
 
 impl<'a> Iterator for IggyMessageViewIterator<'a> {
-    type Item = Result<IggyMessageView<'a>, IggyError>;
+    type Item = IggyMessageView<'a>;
 
     fn next(&mut self) -> Option<Self::Item> {
         if self.position >= self.buffer.len() {
@@ -85,12 +97,8 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
         }
 
         let remaining = &self.buffer[self.position..];
-        match IggyMessageView::new(remaining) {
-            Ok(view) => {
-                self.position += view.size();
-                Some(Ok(view))
-            }
-            Err(e) => Some(Err(e)),
-        }
+        let view = IggyMessageView::new(remaining);
+        self.position += view.size();
+        Some(view)
     }
 }
diff --git a/sdk/src/models/messaging/messages.rs 
b/sdk/src/models/messaging/messages.rs
index dd72bc5a..cb2f4a54 100644
--- a/sdk/src/models/messaging/messages.rs
+++ b/sdk/src/models/messaging/messages.rs
@@ -1,4 +1,8 @@
-use super::{message_view::IggyMessageViewIterator, IggyMessage};
+use std::collections::HashMap;
+
+use super::{
+    message_view::IggyMessageViewIterator, IggyMessage, IggyMessageHeader, 
IGGY_MESSAGE_HEADER_SIZE,
+};
 use crate::bytes_serializable::BytesSerializable;
 use crate::error::IggyError;
 use crate::utils::byte_size::IggyByteSize;
@@ -27,15 +31,6 @@ impl IggyMessages {
         Self::new(BytesMut::with_capacity(capacity as usize).freeze(), 0)
     }
 
-    /// Create a new messages container from a slice of messages
-    pub fn from_messages(messages: &[IggyMessage]) -> Self {
-        let mut buffer = BytesMut::new();
-        for message in messages {
-            buffer.extend_from_slice(&message.to_bytes());
-        }
-        Self::new(buffer.freeze(), messages.len() as u32)
-    }
-
     /// Create iterator over messages
     pub fn iter(&self) -> IggyMessageViewIterator {
         IggyMessageViewIterator::new(&self.buffer)
@@ -56,24 +51,61 @@ impl IggyMessages {
         &self.buffer
     }
 
+    /// Get access to the underlying buffer shallow  copy
+    pub fn shallow_copy(&self) -> Bytes {
+        self.buffer.clone()
+    }
+
     pub fn into_inner(self) -> Bytes {
         self.buffer
     }
 
-    /// Convert to messages
     pub fn to_messages(self) -> Vec<IggyMessage> {
         let mut messages = Vec::with_capacity(self.count as usize);
         let mut position = 0;
+        let buf_len = self.buffer.len();
 
-        while position < self.buffer.len() {
-            let remaining = self.buffer.slice(position..);
-            if let Ok(message) = IggyMessage::from_bytes(remaining) {
-                let message_size = message.get_size_bytes().as_bytes_usize();
-                position += message_size;
-                messages.push(message);
-            } else {
+        while position < buf_len {
+            if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
                 break;
             }
+            let header_bytes = self
+                .buffer
+                .slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize);
+            let header = match IggyMessageHeader::from_bytes(header_bytes) {
+                Ok(h) => h,
+                Err(_) => break,
+            };
+            position += IGGY_MESSAGE_HEADER_SIZE as usize;
+
+            let payload_end = position + header.payload_length as usize;
+            if payload_end > buf_len {
+                break;
+            }
+            let payload = self.buffer.slice(position..payload_end);
+            position = payload_end;
+
+            let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> 
= if header.headers_length > 0 {
+                let headers_end = position + header.headers_length as usize;
+                if headers_end > buf_len {
+                    break;
+                }
+                let headers_bytes = self.buffer.slice(position..headers_end);
+                position = headers_end;
+
+                match HashMap::from_bytes(headers_bytes) {
+                    Ok(map) => Some(map),
+                    Err(_) => break,
+                }
+            } else {
+                None
+            };
+
+            messages.push(IggyMessage {
+                header,
+                payload,
+                headers,
+            });
         }
 
         messages
@@ -90,10 +122,9 @@ impl BytesSerializable for IggyMessages {
         Self: Sized,
     {
         let mut messages_count = 0;
-        let iterator = IggyMessageViewIterator::new(&bytes);
+        let iterator: IggyMessageViewIterator<'_> = 
IggyMessageViewIterator::new(&bytes);
 
-        for result in iterator {
-            result?;
+        for _ in iterator {
             messages_count += 1;
         }
 
diff --git a/sdk/src/models/messaging/mod.rs b/sdk/src/models/messaging/mod.rs
index 056e2ffc..2605a4af 100644
--- a/sdk/src/models/messaging/mod.rs
+++ b/sdk/src/models/messaging/mod.rs
@@ -1,13 +1,15 @@
+mod header;
 mod message;
 mod message_header;
 mod message_header_view;
 mod message_view;
 mod messages;
 
+pub use header::{HeaderKey, HeaderKind, HeaderValue};
 pub use message::IggyMessage;
 pub use message_header::{
     IggyMessageHeader, IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE,
-    IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE,
+    IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_HEADER_RANGE, 
IGGY_MESSAGE_HEADER_SIZE,
     IGGY_MESSAGE_ID_OFFSET_RANGE, IGGY_MESSAGE_OFFSET_OFFSET_RANGE,
     IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, 
IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE,
     IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE,
diff --git a/sdk/src/models/mod.rs b/sdk/src/models/mod.rs
index 5c38549d..892b1e2e 100644
--- a/sdk/src/models/mod.rs
+++ b/sdk/src/models/mod.rs
@@ -2,7 +2,6 @@
 pub mod client_info;
 pub mod consumer_group;
 pub mod consumer_offset_info;
-pub mod header;
 pub mod identity_info;
 pub mod messaging;
 pub mod partition;
diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs
index 1f2931aa..9172ead2 100644
--- a/sdk/src/prelude.rs
+++ b/sdk/src/prelude.rs
@@ -19,7 +19,7 @@ pub use crate::messages::{
     FlushUnsavedBuffer, Partitioning, PollMessages, PollingKind, 
PollingStrategy, SendMessages,
 };
 pub use crate::models::messaging::{
-    IggyMessage, IggyMessageHeader, IggyMessageHeaderView, IggyMessageView,
+    HeaderKey, HeaderValue, IggyMessage, IggyMessageHeader, 
IggyMessageHeaderView, IggyMessageView,
     IggyMessageViewIterator, IggyMessages,
 };
 pub use crate::models::messaging::{
@@ -31,7 +31,10 @@ pub use crate::models::messaging::{
 pub use crate::models::partition::Partition;
 pub use crate::models::stream::Stream;
 pub use crate::models::topic::Topic;
+
 pub use crate::utils::byte_size::IggyByteSize;
+pub use crate::utils::expiry::IggyExpiry;
 pub use crate::utils::sizeable::Sizeable;
 pub use crate::utils::timestamp::IggyTimestamp;
+
 pub use crate::validatable::Validatable;
diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/server/src/binary/handlers/messages/poll_messages_handler.rs
index 846ca909..85527060 100644
--- a/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -41,7 +41,15 @@ impl ServerCommandHandler for PollMessages {
                 self.consumer, self.stream_id, self.topic_id, self.partition_id
             ))?;
 
-        sender.send_ok_response(messages.buffer()).await?;
+        // Collect all chunks first into a Vec to extend their lifetimes.
+        // This ensures the Arc<[u8]> references from each ByteSliceView stay 
alive
+        // throughout the async vectored I/O operation, preventing "borrowed 
value does not live
+        // long enough" errors while optimizing transmission by using larger 
chunks.
+
+        let length = messages.size().to_le_bytes();
+        let slices: Vec<IoSlice> = 
messages.chunks().map(IoSlice::new).collect();
+
+        sender.send_ok_response_vectored(&length, slices).await?;
         Ok(())
     }
 }
diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs 
b/server/src/binary/handlers/messages/send_messages_handler.rs
index 5c3f20e2..60d9735a 100644
--- a/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -9,7 +9,6 @@ use iggy::error::IggyError;
 use iggy::identifier::Identifier;
 use iggy::prelude::*;
 use iggy::utils::sizeable::Sizeable;
-use std::time::Instant;
 use tracing::instrument;
 
 impl ServerCommandHandler for SendMessages {
@@ -68,8 +67,6 @@ impl ServerCommandHandler for SendMessages {
                 None,
             )
             .await?;
-
-        // TODO(hubcio): is it legal to drop here?
         drop(system);
 
         self.stream_id = stream_id;
diff --git a/server/src/streaming/partitions/messages.rs 
b/server/src/streaming/partitions/messages.rs
index c0fef4fc..d4e6fc1f 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -66,7 +66,7 @@ impl Partition {
         &self,
         start_offset: u64,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
+    ) -> Result<IggyMessagesSlice, IggyError> {
         trace!(
             "Getting messages for start offset: {start_offset} for partition: 
{}, current offset: {}...",
             self.partition_id,
@@ -85,27 +85,20 @@ impl Partition {
         let segments = self.filter_segments_by_offsets(start_offset, 
end_offset);
         match segments.len() {
             0 => panic!("TODO"),
-            1 => {
-                let messages = segments[0]
-                    .get_messages_by_offset(start_offset, count)
-                    .await?;
-                Ok(messages)
-            }
-            _ => {
-                let messages =
-                    Self::get_messages_from_segments(segments, start_offset, 
count).await?;
-                Ok(messages)
-            }
+            1 => Ok(segments[0]
+                .get_messages_by_offset(start_offset, count)
+                .await?),
+            _ => Ok(Self::get_messages_from_segments(segments, start_offset, 
count).await?),
         }
     }
 
     // Retrieves the first messages (up to a specified count).
-    pub async fn get_first_messages(&self, count: u32) -> Result<IggyMessages, 
IggyError> {
+    pub async fn get_first_messages(&self, count: u32) -> 
Result<IggyMessagesSlice, IggyError> {
         self.get_messages_by_offset(0, count).await
     }
 
     // Retrieves the last messages (up to a specified count).
-    pub async fn get_last_messages(&self, count: u32) -> Result<IggyMessages, 
IggyError> {
+    pub async fn get_last_messages(&self, count: u32) -> 
Result<IggyMessagesSlice, IggyError> {
         let mut requested_count = count as u64;
         if requested_count > self.current_offset + 1 {
             requested_count = self.current_offset + 1
@@ -120,7 +113,7 @@ impl Partition {
         &self,
         consumer: PollingConsumer,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
+    ) -> Result<IggyMessagesSlice, IggyError> {
         let (consumer_offsets, consumer_id) = match consumer {
             PollingConsumer::Consumer(consumer_id, _) => 
(&self.consumer_offsets, consumer_id),
             PollingConsumer::ConsumerGroup(group_id, _) => 
(&self.consumer_group_offsets, group_id),
@@ -183,13 +176,49 @@ impl Partition {
             .collect()
     }
 
-    // Retrieves messages from multiple segments.
+    /// Retrieves messages from multiple segments.
     async fn get_messages_from_segments(
         segments: Vec<&Segment>,
         offset: u64,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
-        todo!()
+    ) -> Result<IggyMessagesSlice, IggyError> {
+        let mut slices = Vec::new();
+        let mut remaining_count = count;
+        let mut current_offset = offset;
+
+        for segment in segments {
+            if remaining_count == 0 {
+                break;
+            }
+
+            let messages = segment
+                .get_messages_by_offset(current_offset, remaining_count)
+                .await
+                .with_error_context(|error| {
+                    format!(
+                        "{COMPONENT} (error: {error}) - failed to get messages 
from segment, segment: {}, \
+                         offset: {}, count: {}",
+                        segment, current_offset, remaining_count
+                    )
+                })?;
+
+            // Update remaining count and offset for next segment
+            let messages_count = messages.count();
+            remaining_count = remaining_count.saturating_sub(messages_count);
+
+            // Update the offset for the next segment if needed
+            if messages_count > 0 {
+                // If we got messages, the next offset should be after the 
last message
+                // from this segment
+                current_offset += messages_count as u64;
+            }
+
+            slices.push(messages);
+        }
+
+        // Combine all segment slices into a single composite slice
+        // This avoids copying the data
+        Ok(IggyMessagesSlice::combine(slices))
         // let mut results = Vec::new();
         // let mut remaining_count = count;
         // for segment in segments {
@@ -221,6 +250,12 @@ impl Partition {
         messages: IggyMessagesMut,
         confirmation: Option<Confirmation>,
     ) -> Result<(), IggyError> {
+        trace!(
+            "Appending {} messages of size {} to partition with ID: {}...",
+            self.partition_id,
+            messages.count(),
+            messages.size()
+        );
         {
             let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
             if last_segment.is_closed {
@@ -232,11 +267,10 @@ impl Partition {
                 
self.add_persisted_segment(start_offset).await.with_error_context(|error| 
format!(
                     "{COMPONENT} (error: {error}) - failed to add persisted 
segment, partition: {}, start offset: {}",
                     self, start_offset,
-                ))?;
+                ))?
             }
         }
 
-        let messages_size = messages.size();
         let current_offset = if !self.should_increment_offset {
             0
         } else {
@@ -276,8 +310,6 @@ impl Partition {
         }
         */
 
-        // Why are we even doing it this way ? XD
-        // What are those scope
         let messages_count = {
             let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
             last_segment
@@ -289,18 +321,20 @@ impl Partition {
                      )
                  })?
         };
-        let last_offset = current_offset + messages_count as u64 - 1;
+
+        // Handle the case when messages_count is 0 to avoid integer underflow
+        let last_offset = if messages_count == 0 {
+            current_offset
+        } else {
+            current_offset + messages_count as u64 - 1
+        };
+
         if self.should_increment_offset {
             self.current_offset = last_offset;
         } else {
             self.should_increment_offset = true;
             self.current_offset = last_offset;
         }
-        /*
-        if let Some(cache) = &mut self.cache {
-            cache.extend(retained_messages);
-        }
-        */
 
         self.unsaved_messages_count += messages_count;
         {
diff --git a/server/src/streaming/partitions/partition.rs 
b/server/src/streaming/partitions/partition.rs
index 75822f10..2a44ee88 100644
--- a/server/src/streaming/partitions/partition.rs
+++ b/server/src/streaming/partitions/partition.rs
@@ -88,16 +88,6 @@ impl Partition {
             config.get_consumer_offsets_path(stream_id, topic_id, 
partition_id);
         let consumer_group_offsets_path =
             config.get_consumer_group_offsets_path(stream_id, topic_id, 
partition_id);
-        //TODO: Fix me
-        /*
-        let (cached_memory_tracker, messages) = match config.cache.enabled {
-            false => (None, None),
-            true => (
-                CacheMemoryTracker::initialize(&config.cache),
-                Some(SmartCache::new()),
-            ),
-        };
-        */
 
         let mut partition = Partition {
             stream_id,
@@ -108,8 +98,6 @@ impl Partition {
             consumer_offsets_path,
             consumer_group_offsets_path,
             message_expiry,
-            //cache: messages,
-            //cached_memory_tracker,
             message_deduplicator: match config.message_deduplication.enabled {
                 true => Some(MessageDeduplicator::new(
                     if config.message_deduplication.max_entries > 0 {
@@ -169,27 +157,6 @@ impl Partition {
 
         partition
     }
-
-    pub fn get_cache_metrics(&self) -> CacheMetrics {
-        //TODO: Fix me
-        /*
-        if let Some(cache) = self.cache.as_ref() {
-            let cache_metrics = cache.get_metrics();
-            CacheMetrics {
-                hits: cache_metrics.hits,
-                misses: cache_metrics.misses,
-                hit_ratio: cache_metrics.hit_ratio,
-            }
-        } else {
-            CacheMetrics {
-                hits: 0,
-                misses: 0,
-                hit_ratio: 0.0,
-            }
-        }
-        */
-        Default::default()
-    }
 }
 
 impl Sizeable for Partition {
@@ -214,7 +181,7 @@ impl fmt::Display for Partition {
 
 #[cfg(test)]
 mod tests {
-    use crate::configs::system::{CacheConfig, SystemConfig};
+    use crate::configs::system::SystemConfig;
     use crate::streaming::partitions::partition::Partition;
     use crate::streaming::persistence::persister::{FileWithSyncPersister, 
PersisterKind};
     use crate::streaming::storage::SystemStorage;
diff --git a/server/src/streaming/segments/indexes/index.rs 
b/server/src/streaming/segments/indexes/index.rs
index fd1f5ea0..c6f9111b 100644
--- a/server/src/streaming/segments/indexes/index.rs
+++ b/server/src/streaming/segments/indexes/index.rs
@@ -1,7 +1,3 @@
-use crate::streaming::segments::segment::Segment;
-use iggy::error::IggyError;
-use iggy::error::IggyError::InvalidOffset;
-
 #[derive(Debug, Eq, Clone, Copy, Default)]
 pub struct Index {
     pub offset: u32,
@@ -16,192 +12,3 @@ impl PartialEq<Self> for Index {
             && self.timestamp == other.timestamp
     }
 }
-
-#[derive(Debug, Eq, PartialEq, Clone, Copy, Default)]
-pub struct FetchIndex {
-    pub offset: u32,
-    pub position: u32,
-    pub timestamp: u64,
-    pub messages_count: u32,
-}
-
-#[derive(Debug, Clone, Copy, Default)]
-pub struct IndexRange {
-    pub start: Index,
-    pub end: Index,
-}
-
-impl Segment {
-    pub fn load_highest_lower_bound_index(
-        &self,
-        indices: &[Index],
-        start_offset: u32,
-        end_offset: u32,
-    ) -> Result<IndexRange, IggyError> {
-        let starting_offset_idx = binary_search_index(indices, start_offset);
-        let ending_offset_idx = binary_search_index(indices, end_offset);
-
-        match (starting_offset_idx, ending_offset_idx) {
-            (Some(starting_offset_idx), Some(ending_offset_idx)) => 
Ok(IndexRange {
-                start: indices[starting_offset_idx],
-                end: indices[ending_offset_idx],
-            }),
-            (Some(starting_offset_idx), None) => Ok(IndexRange {
-                start: indices[starting_offset_idx],
-                end: *indices.last().unwrap(),
-            }),
-            (None, _) => Err(InvalidOffset(start_offset as u64 + 
self.start_offset)),
-        }
-    }
-}
-
-fn binary_search_index(indices: &[Index], offset: u32) -> Option<usize> {
-    match indices.binary_search_by(|index| index.offset.cmp(&offset)) {
-        Ok(index) => Some(index),
-        Err(index) => {
-            if index < indices.len() {
-                Some(index)
-            } else {
-                None
-            }
-        }
-    }
-}
-
-impl IndexRange {
-    pub fn max_range() -> Self {
-        Self {
-            start: Index {
-                offset: 0,
-                position: 0,
-                timestamp: 0,
-            },
-            end: Index {
-                offset: u32::MAX - 1,
-                position: u32::MAX,
-                timestamp: u64::MAX,
-            },
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::configs::system::{SegmentConfig, SystemConfig};
-    use iggy::utils::expiry::IggyExpiry;
-    use std::sync::atomic::AtomicU64;
-    use std::sync::Arc;
-
-    async fn create_segment() -> Segment {
-        let stream_id = 1;
-        let topic_id = 2;
-        let partition_id = 3;
-        let start_offset = 0;
-        let config = Arc::new(SystemConfig {
-            segment: SegmentConfig {
-                cache_indexes: true,
-                ..Default::default()
-            },
-            ..Default::default()
-        });
-
-        Segment::create(
-            stream_id,
-            topic_id,
-            partition_id,
-            start_offset,
-            config,
-            IggyExpiry::NeverExpire,
-            Arc::new(AtomicU64::new(0)),
-            Arc::new(AtomicU64::new(0)),
-            Arc::new(AtomicU64::new(0)),
-            Arc::new(AtomicU64::new(0)),
-            Arc::new(AtomicU64::new(0)),
-            Arc::new(AtomicU64::new(0)),
-        )
-    }
-
-    fn create_test_indices(segment: &mut Segment) {
-        let indexes = vec![
-            Index {
-                offset: 5,
-                position: 0,
-                timestamp: 1000,
-            },
-            Index {
-                offset: 20,
-                position: 100,
-                timestamp: 2000,
-            },
-            Index {
-                offset: 35,
-                position: 200,
-                timestamp: 3000,
-            },
-            Index {
-                offset: 50,
-                position: 300,
-                timestamp: 4000,
-            },
-            Index {
-                offset: 65,
-                position: 400,
-                timestamp: 5000,
-            },
-        ];
-        segment.indexes.as_mut().unwrap().extend(indexes);
-    }
-
-    #[tokio::test]
-    async fn should_find_both_indices() {
-        let mut segment = create_segment().await;
-        create_test_indices(&mut segment);
-        let result = segment
-            .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 
15, 45)
-            .unwrap();
-
-        assert_eq!(result.start.offset, 20);
-        assert_eq!(result.end.offset, 50);
-    }
-
-    #[tokio::test]
-    async fn start_and_end_index_should_be_equal() {
-        let mut segment = create_segment().await;
-        create_test_indices(&mut segment);
-        let result_end_range = segment
-            .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 
65, 100)
-            .unwrap();
-
-        assert_eq!(result_end_range.start.offset, 65);
-        assert_eq!(result_end_range.end.offset, 65);
-
-        let result_start_range = segment
-            .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 
0, 5)
-            .unwrap();
-        assert_eq!(result_start_range.start.offset, 5);
-        assert_eq!(result_start_range.end.offset, 5);
-    }
-
-    #[tokio::test]
-    async fn should_clamp_last_index_when_out_of_range() {
-        let mut segment = create_segment().await;
-        create_test_indices(&mut segment);
-        let result = segment
-            .load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 
5, 100)
-            .unwrap();
-
-        assert_eq!(result.start.offset, 5);
-        assert_eq!(result.end.offset, 65);
-    }
-
-    #[tokio::test]
-    async fn should_return_err_when_both_indices_out_of_range() {
-        let mut segment = create_segment().await;
-        create_test_indices(&mut segment);
-
-        let result =
-            
segment.load_highest_lower_bound_index(segment.indexes.as_ref().unwrap(), 100, 
200);
-        assert!(result.is_err());
-    }
-}
diff --git a/server/src/streaming/segments/indexes/index_reader.rs 
b/server/src/streaming/segments/indexes/index_reader.rs
index 2ed09640..83a8eb3b 100644
--- a/server/src/streaming/segments/indexes/index_reader.rs
+++ b/server/src/streaming/segments/indexes/index_reader.rs
@@ -1,7 +1,4 @@
-use super::{
-    index::{Index, IndexRange},
-    INDEX_SIZE,
-};
+use super::{index::Index, INDEX_SIZE};
 use error_set::ErrContext;
 use iggy::error::IggyError;
 use std::{
@@ -92,66 +89,6 @@ impl SegmentIndexReader {
         Ok(indexes)
     }
 
-    /// Loads an index range from the index file given a start/end offset.
-    pub async fn load_index_range_impl(
-        &self,
-        index_start_offset: u64,
-        index_end_offset: u64,
-        segment_start_offset: u64,
-    ) -> Result<Option<IndexRange>, IggyError> {
-        if index_start_offset > index_end_offset {
-            trace!(
-                "Index start offset {} is greater than index end offset {} for 
file {}.",
-                index_start_offset,
-                index_end_offset,
-                self.file_path
-            );
-            return Ok(None);
-        }
-        let file_size = self.file_size();
-        if file_size == 0 {
-            trace!("Index file {} is empty.", self.file_path);
-            return Ok(None);
-        }
-        trace!("Index file length: {} bytes.", file_size);
-
-        let relative_start_offset = (index_start_offset - 
segment_start_offset) as u32;
-        let relative_end_offset = (index_end_offset - segment_start_offset) as 
u32;
-        let mut index_range = IndexRange::default();
-
-        let buf = match self.read_at(0, file_size).await {
-            Ok(buf) => buf,
-            Err(error) if error.kind() == ErrorKind::UnexpectedEof => return 
Ok(None),
-            Err(error) => {
-                error!(
-                    "Error reading batch header at offset 0 in file {}: 
{error}",
-                    self.file_path
-                );
-                return Err(IggyError::CannotReadFile);
-            }
-        };
-        let mut last_index = Index::default();
-        for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
-            let current_index = parse_index(chunk).with_error_context(|error| {
-                format!("Failed to parse index {}: {error}", self.file_path)
-            })?;
-            if current_index.offset >= relative_start_offset
-                && index_range.start == Index::default()
-            {
-                index_range.start = current_index;
-            }
-            if current_index.offset >= relative_end_offset {
-                index_range.end = current_index;
-                break;
-            }
-            last_index = current_index;
-        }
-        if index_range.end == Index::default() {
-            index_range.end = last_index;
-        }
-        Ok(Some(index_range))
-    }
-
     pub async fn load_index_for_timestamp_impl(
         &self,
         timestamp: u64,
@@ -162,38 +99,57 @@ impl SegmentIndexReader {
             return Ok(Some(Index::default()));
         }
 
-        let buf = match self.read_at(0, file_size).await {
-            Ok(buf) => buf,
-            Err(error) if error.kind() == ErrorKind::UnexpectedEof => return 
Ok(None),
-            Err(error) => {
-                error!(
-                    "Error reading batch header at offset 0 in file {}: 
{error}",
-                    self.file_path
-                );
-                return Err(IggyError::CannotReadFile);
-            }
-        };
-        let mut last_index: Option<Index> = None;
-        for chunk in buf.chunks_exact(INDEX_SIZE as usize) {
-            let current = parse_index(chunk)?;
-            if current.timestamp >= timestamp {
-                return Ok(Some(last_index.unwrap_or_default()));
+        let total_indexes = file_size / INDEX_SIZE;
+        trace!(
+            "Searching for timestamp {} in {} indexes",
+            timestamp,
+            total_indexes
+        );
+
+        let mut left = 0;
+        let mut right = total_indexes - 1;
+        let mut result: Option<Index> = None;
+
+        while left <= right {
+            let mid = left + (right - left) / 2;
+            let buf = match self.read_at(mid * INDEX_SIZE, INDEX_SIZE).await {
+                Ok(buf) => buf,
+                Err(error) if error.kind() == ErrorKind::UnexpectedEof => 
return Ok(None),
+                Err(error) => {
+                    error!(
+                        "Error reading index at position {} in file {}: 
{error}",
+                        mid, self.file_path
+                    );
+                    return Err(IggyError::CannotReadFile);
+                }
+            };
+            let current = parse_index(&buf)?;
+            match current.timestamp.cmp(&timestamp) {
+                std::cmp::Ordering::Equal => {
+                    return Ok(Some(current));
+                }
+                std::cmp::Ordering::Less => {
+                    result = Some(current);
+                    left = mid + 1;
+                }
+                std::cmp::Ordering::Greater => {
+                    if mid == 0 {
+                        return Ok(result.or(Some(Index::default())));
+                    }
+                    right = mid - 1;
+                }
             }
-            last_index = Some(current);
         }
-        Ok(None)
+        trace!("Index for timestamp {timestamp}: {result:?}");
+        Ok(result)
     }
 
+    /// Returns the size of the index file in bytes.
     fn file_size(&self) -> u32 {
         self.index_size_bytes.load(Ordering::Acquire) as u32
     }
 
-    /// Returns the total number of indexes stored in the file.
-    pub fn get_indexes_count(&self) -> u32 {
-        let file_size = self.file_size();
-        file_size / INDEX_SIZE
-    }
-
+    /// Reads a specified number of bytes from the index file at a given 
offset.
     async fn read_at(&self, offset: u32, len: u32) -> Result<Vec<u8>, 
std::io::Error> {
         let file = self.file.clone();
         spawn_blocking(move || {
@@ -246,6 +202,7 @@ impl SegmentIndexReader {
     }
 }
 
+/// Parses an index from a byte slice.
 fn parse_index(chunk: &[u8]) -> Result<Index, IggyError> {
     let offset = u32::from_le_bytes(
         chunk[0..4]
diff --git a/server/src/streaming/segments/indexes/index_writer.rs 
b/server/src/streaming/segments/indexes/index_writer.rs
index c8fa5739..306e7350 100644
--- a/server/src/streaming/segments/indexes/index_writer.rs
+++ b/server/src/streaming/segments/indexes/index_writer.rs
@@ -61,30 +61,6 @@ impl SegmentIndexWriter {
         })
     }
 
-    /// Append the given index record to the index file.
-    pub async fn save_index(&mut self, index: Index) -> Result<(), IggyError> {
-        let mut buf = [0u8; INDEX_SIZE as usize];
-        buf[0..4].copy_from_slice(&index.offset.to_le_bytes());
-        buf[4..8].copy_from_slice(&index.position.to_le_bytes());
-        buf[8..16].copy_from_slice(&index.timestamp.to_le_bytes());
-
-        {
-            self.file
-                .write_all(&buf)
-                .await
-                .with_error_context(|error| {
-                    format!("Failed to write index to file: {}. {error}", 
self.file_path)
-                })
-                .map_err(|_| IggyError::CannotSaveIndexToSegment)?;
-        }
-        if self.fsync {
-            let _ = self.fsync().await;
-        }
-        self.index_size_bytes
-            .fetch_add(INDEX_SIZE as u64, Ordering::Release);
-        Ok(())
-    }
-
     /// Append multiple index records to the index file in a single operation.
     pub async fn save_indexes(&mut self, indexes: &[Index]) -> Result<(), 
IggyError> {
         if indexes.is_empty() {
diff --git a/server/src/streaming/segments/indexes/mod.rs 
b/server/src/streaming/segments/indexes/mod.rs
index e17de088..5ad84180 100644
--- a/server/src/streaming/segments/indexes/mod.rs
+++ b/server/src/streaming/segments/indexes/mod.rs
@@ -6,6 +6,5 @@ mod index_writer;
 pub const INDEX_SIZE: u32 = 16;
 
 pub use index::Index;
-pub use index::IndexRange;
 pub use index_reader::SegmentIndexReader;
 pub use index_writer::SegmentIndexWriter;
diff --git a/server/src/streaming/segments/messages/messages_reader.rs 
b/server/src/streaming/segments/messages/messages_reader.rs
index ee5cbb02..caff84d2 100644
--- a/server/src/streaming/segments/messages/messages_reader.rs
+++ b/server/src/streaming/segments/messages/messages_reader.rs
@@ -1,10 +1,8 @@
-use crate::streaming::segments::indexes::IndexRange;
 use bytes::{Bytes, BytesMut};
 use error_set::ErrContext;
 use iggy::{
     error::IggyError,
     prelude::{BytesSerializable, IggyMessages},
-    utils::{byte_size::IggyByteSize, timestamp::IggyTimestamp},
 };
 use std::{
     fmt,
@@ -64,6 +62,8 @@ impl MessagesReader {
 
         log_size_bytes.store(actual_log_size, Ordering::Release);
 
+        trace!("Opened messages file for reading: {file_path}, size: 
{actual_log_size}");
+
         Ok(Self {
             file_path: file_path.to_string(),
             file: Arc::new(file),
@@ -71,10 +71,9 @@ impl MessagesReader {
         })
     }
 
-    // TODO: This one is most likely not needed anymore.
     /// Loads and returns all message IDs from the log file.
     pub async fn load_message_ids_impl(&self) -> Result<Vec<u128>, IggyError> {
-        let mut file_size = self.file_size();
+        let file_size = self.file_size();
         if file_size == 0 {
             trace!("Log file {} is empty.", self.file_path);
             return Ok(Vec::new());
@@ -118,10 +117,7 @@ impl MessagesReader {
             trace!("Messages file {} is empty.", self.file_path);
             return Ok(IggyMessages::default());
         }
-        let messages_bytes = match self
-            .read_bytes_at(start_pos as u64, count_bytes as u64)
-            .await
-        {
+        let messages_bytes = match self.read_at(start_pos as u64, count_bytes 
as u64).await {
             Ok(buf) => buf,
             Err(error) if error.kind() == ErrorKind::UnexpectedEof => {
                 return Ok(IggyMessages::default())
@@ -135,14 +131,14 @@ impl MessagesReader {
             }
         };
 
-        IggyMessages::from_bytes(messages_bytes)
+        Ok(IggyMessages::new(messages_bytes, messages_count))
     }
 
     fn file_size(&self) -> u64 {
         self.log_size_bytes.load(Ordering::Acquire)
     }
 
-    async fn read_bytes_at(&self, offset: u64, len: u64) -> Result<Bytes, 
std::io::Error> {
+    async fn read_at(&self, offset: u64, len: u64) -> Result<Bytes, 
std::io::Error> {
         let file = self.file.clone();
         spawn_blocking(move || {
             let mut buf = BytesMut::with_capacity(len as usize);
diff --git a/server/src/streaming/segments/messages/messages_writer.rs 
b/server/src/streaming/segments/messages/messages_writer.rs
index 4bd8523f..bc46040b 100644
--- a/server/src/streaming/segments/messages/messages_writer.rs
+++ b/server/src/streaming/segments/messages/messages_writer.rs
@@ -12,7 +12,6 @@ use std::{
         atomic::{AtomicU64, Ordering},
         Arc,
     },
-    time::Duration,
 };
 use tokio::{
     fs::{File, OpenOptions},
@@ -95,13 +94,17 @@ impl MessagesWriter {
     /// Append a messages to the messages file.
     pub async fn save_batches(
         &mut self,
-        messages: IggyMessages,
+        messages: Vec<IggyMessages>,
         confirmation: Confirmation,
     ) -> Result<IggyByteSize, IggyError> {
-        let messages_size = messages.size();
+        let messages_size: usize = messages.iter().map(|m| m.size() as 
usize).sum();
+        trace!(
+            "Saving batch of size {messages_size} bytes to messages file: {}",
+            self.file_path
+        );
         match confirmation {
             Confirmation::Wait => {
-                self.write_batch(messages).await?;
+                self.write_batch_vectored(messages).await?;
                 self.messages_size_bytes
                     .fetch_add(messages_size as u64, Ordering::AcqRel);
                 trace!(
@@ -127,16 +130,10 @@ impl MessagesWriter {
         Ok(IggyByteSize::from(messages_size as u64))
     }
 
-    /// Write a batch of bytes to the messages file and return the new file 
position.
-    async fn write_batch(&mut self, messages: IggyMessages) -> Result<(), 
IggyError> {
+    /// Write a batch of bytes to the log file.
+    async fn write_batch_vectored(&mut self, batches: Vec<IggyMessages>) -> 
Result<(), IggyError> {
         if let Some(ref mut file) = self.file {
-            file.write_all(messages.buffer())
-                .await
-                .with_error_context(|error| {
-                    format!("Failed to messages to file: {}. {error}", 
self.file_path)
-                })
-                .map_err(|_| IggyError::CannotWriteToFile)?;
-
+            super::write_batch_vectored(file, &self.file_path, batches).await?;
             Ok(())
         } else {
             error!("File handle is not available for synchronous write.");
diff --git a/server/src/streaming/segments/messages/mod.rs 
b/server/src/streaming/segments/messages/mod.rs
index f5cd8fd6..e4bff590 100644
--- a/server/src/streaming/segments/messages/mod.rs
+++ b/server/src/streaming/segments/messages/mod.rs
@@ -5,3 +5,45 @@ mod persister_task;
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 pub use persister_task::PersisterTask;
+
+use error_set::ErrContext;
+use iggy::{error::IggyError, prelude::IggyMessages};
+use std::io::IoSlice;
+use tokio::{fs::File, io::AsyncWriteExt};
+use tracing::error;
+
+/// Write a batch of messages to a file using vectored I/O
+///
+/// This function writes all the messages in the batch to the file using 
vectored I/O,
+/// which can be more efficient than writing each message individually.
+pub async fn write_batch_vectored(
+    file: &mut File,
+    file_path: &str,
+    batches: Vec<IggyMessages>,
+) -> Result<u32, IggyError> {
+    let mut total_bytes_written = 0;
+    let mut slices_vec: Vec<IoSlice<'_>> =
+        batches.iter().map(|b| IoSlice::new(b.buffer())).collect();
+
+    let mut slices = slices_vec.as_mut_slice();
+
+    while !slices.is_empty() {
+        let written = file
+            .write_vectored(slices)
+            .await
+            .with_error_context(|error| {
+                format!("Failed to write vectored to file: {file_path}. 
{error}")
+            })
+            .map_err(|_| IggyError::CannotWriteToFile)? as u32;
+
+        if written == 0 {
+            error!("Failed to write batch of messages to file: {file_path}");
+            return Err(IggyError::CannotWriteToFile);
+        }
+
+        total_bytes_written += written;
+        IoSlice::advance_slices(&mut slices, written as usize);
+    }
+
+    Ok(total_bytes_written)
+}
diff --git a/server/src/streaming/segments/messages/persister_task.rs 
b/server/src/streaming/segments/messages/persister_task.rs
index 37728364..f76000e8 100644
--- a/server/src/streaming/segments/messages/persister_task.rs
+++ b/server/src/streaming/segments/messages/persister_task.rs
@@ -14,7 +14,7 @@ use tracing::{error, trace, warn};
 #[derive(Debug)]
 /// A command to the persister task.
 enum PersisterTaskCommand {
-    WriteRequest(IggyMessages),
+    WriteRequest(Vec<IggyMessages>),
     Shutdown,
 }
 
@@ -59,7 +59,7 @@ impl PersisterTask {
     }
 
     /// Sends the batch bytes to the persister task (fire-and-forget).
-    pub async fn persist(&self, messages: IggyMessages) {
+    pub async fn persist(&self, messages: Vec<IggyMessages>) {
         if let Err(e) = self
             .sender
             .send_async(PersisterTaskCommand::WriteRequest(messages))
@@ -213,7 +213,7 @@ impl PersisterTask {
     async fn write_with_retries(
         file: &mut File,
         file_path: &str,
-        messages: IggyMessages,
+        messages: Vec<IggyMessages>,
         fsync: bool,
         max_retries: u32,
         retry_delay: IggyDuration,
@@ -225,41 +225,43 @@ impl PersisterTask {
         // There are few errors which are worth retrying such as LSE (Latent 
sector error), as the file system
         // might be able to reallocate a new sector for the data and recover, 
but not every file system supports that.
         // In general this topic should be furthered researched rather than 
just naively retry when the write fails.
-        let messages_size = messages.size();
-        loop {
-            // TODO: use vectored API
-            match file.write_all(messages.buffer()).await {
-                Ok(_) => {
-                    if fsync {
-                        match file.sync_all().await {
-                            Ok(_) => return Ok(messages_size),
-                            Err(e) => {
-                                attempts += 1;
-                                error!(
-                                    "Error syncing file {file_path}: {:?} 
(attempt {attempts}/{max_retries})",
-                                     e,
-                                );
-                            }
-                        }
-                    } else {
-                        return Ok(messages_size);
-                    }
-                }
-                Err(e) => {
-                    attempts += 1;
-                    error!(
-                        "Error writing to file {file_path}: {:?} (attempt 
{attempts}/{max_retries})",
-                        e,
-                    );
-                }
-            }
-            if attempts >= max_retries {
-                error!(
-                    "Failed to write to file {file_path} after {max_retries} 
attempts, something's terribly wrong",
-                );
-                return Err(IggyError::CannotWriteToFile);
-            }
-            sleep(retry_delay.get_duration()).await;
-        }
+        super::write_batch_vectored(file, file_path, messages).await
+
+        // let messages_size = messages.iter().map(|m| m.size()).sum();
+        // loop {
+        //     // TODO: use vectored API
+        //     match super::write_batch_vectored(file, file_path, 
messages).await {
+        //         Ok(_) => {
+        //             if fsync {
+        //                 match file.sync_all().await {
+        //                     Ok(_) => return Ok(messages_size),
+        //                     Err(e) => {
+        //                         attempts += 1;
+        //                         error!(
+        //                             "Error syncing file {file_path}: {:?} 
(attempt {attempts}/{max_retries})",
+        //                              e,
+        //                         );
+        //                     }
+        //                 }
+        //             } else {
+        //                 return Ok(messages_size);
+        //             }
+        //         }
+        //         Err(e) => {
+        //             attempts += 1;
+        //             error!(
+        //                 "Error writing to file {file_path}: {:?} (attempt 
{attempts}/{max_retries})",
+        //                 e,
+        //             );
+        //         }
+        //     }
+        //     if attempts >= max_retries {
+        //         error!(
+        //             "Failed to write to file {file_path} after 
{max_retries} attempts, something's terribly wrong",
+        //         );
+        //         return Err(IggyError::CannotWriteToFile);
+        //     }
+        //     sleep(retry_delay.get_duration()).await;
+        // }
     }
 }
diff --git a/server/src/streaming/segments/messages_accumulator copy 2.rs 
b/server/src/streaming/segments/messages_accumulator copy 2.rs
deleted file mode 100644
index 28cec981..00000000
--- a/server/src/streaming/segments/messages_accumulator copy 2.rs      
+++ /dev/null
@@ -1,141 +0,0 @@
-use super::{IggyMessagesMut, Index};
-use bytes::BytesMut;
-use iggy::prelude::IggyMessages;
-use iggy::utils::byte_size::IggyByteSize;
-use iggy::utils::timestamp::IggyTimestamp;
-use std::mem;
-
-/// A container that accumulates messages before they are written to disk
-#[derive(Debug, Default)]
-pub struct MessagesAccumulator {
-    /// Base offset of the first message
-    base_offset: u64,
-
-    /// Base timestamp of the first message
-    base_timestamp: u64,
-
-    /// Current size of all accumulated messages
-    current_size: IggyByteSize,
-
-    /// Current maximum offset
-    current_offset: u64,
-
-    /// Current maximum timestamp
-    current_timestamp: u64,
-
-    /// A single buffer containing all accumulated messages
-    messages: Vec<u8>,
-
-    /// Indexes
-    indexes: Vec<Index>,
-
-    /// Number of messages in the accumulator
-    messages_count: u32,
-}
-
-impl MessagesAccumulator {
-    /// Coalesces a batch of messages into the accumulator
-    /// This method also prepares the messages for persistence by setting their
-    /// offset, timestamp, record size, and checksum fields
-    /// Returns the number of messages in the batch
-    pub fn coalesce_batch(
-        &mut self,
-        current_offset: u64,
-        current_position: u32,
-        mut batch: IggyMessagesMut,
-    ) -> u32 {
-        // TODO(hubcio): add capacity
-        let batch_messages_count = batch.count();
-
-        if batch_messages_count == 0 {
-            return 0;
-        }
-
-        if self.messages.is_empty() {
-            self.base_offset = current_offset;
-            self.base_timestamp = IggyTimestamp::now().as_micros();
-            self.current_offset = current_offset;
-            self.current_timestamp = self.base_timestamp;
-        }
-
-        batch.prepare_for_persistence(self.base_offset, 
self.current_timestamp);
-
-        let batch_size = batch.size();
-        let batch = batch.make_immutable();
-        let mut current_position = current_position;
-        for message in batch.iter() {
-            let msg_size = message.as_ref().unwrap().size() as u32;
-
-            let offset = 
message.as_ref().unwrap().msg_header().unwrap().offset();
-            let relative_offset = (offset - self.base_offset) as u32;
-            let position = current_position;
-            let timestamp = 
message.as_ref().unwrap().msg_header().unwrap().timestamp();
-
-            self.indexes.push(Index {
-                offset: relative_offset,
-                position,
-                timestamp,
-            });
-
-            current_position += msg_size;
-        }
-        let mut messages: Vec<u8> = batch.into_inner().into();
-
-        self.messages.push(batch);
-
-        self.messages_count += batch_messages_count;
-        self.current_size += IggyByteSize::from(batch_size as u64);
-        self.current_offset = self.base_offset + self.messages_count as u64 - 
1;
-        self.current_timestamp = IggyTimestamp::now().as_micros();
-
-        batch_messages_count
-    }
-
-    pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) 
-> IggyMessages {
-        let mut messages = IggyMessages::default();
-
-        for message in self.messages.iter() {
-            if message.msg_header().unwrap().offset() >= start_offset
-                && message.msg_header().unwrap().offset() <= end_offset
-            {
-                messages.push(message.clone());
-            }
-        }
-        messages
-    }
-
-    /// Checks if the accumulator is empty
-    pub fn is_empty(&self) -> bool {
-        self.messages.is_empty() || self.messages_count == 0
-    }
-
-    /// Returns the number of unsaved messages
-    pub fn unsaved_messages_count(&self) -> usize {
-        self.messages_count as usize
-    }
-
-    /// Returns the maximum offset in the accumulator
-    pub fn max_offset(&self) -> u64 {
-        self.current_offset
-    }
-
-    /// Returns the maximum timestamp in the accumulator
-    pub fn batch_max_timestamp(&self) -> u64 {
-        self.current_timestamp
-    }
-
-    /// Returns the base offset of the accumulator
-    pub fn base_offset(&self) -> u64 {
-        self.base_offset
-    }
-
-    /// Returns the base timestamp of the accumulator
-    pub fn batch_base_timestamp(&self) -> u64 {
-        self.base_timestamp
-    }
-
-    /// Takes ownership of the accumulator and returns the messages
-    pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) {
-        (self.messages, self.indexes)
-    }
-}
diff --git a/server/src/streaming/segments/messages_accumulator copy.rs 
b/server/src/streaming/segments/messages_accumulator copy.rs
deleted file mode 100644
index 5a5f9ffc..00000000
--- a/server/src/streaming/segments/messages_accumulator copy.rs        
+++ /dev/null
@@ -1,106 +0,0 @@
-use bytes::BytesMut;
-use iggy::models::{IggyMessages, IggyMessagesMut};
-use iggy::utils::byte_size::IggyByteSize;
-use iggy::utils::timestamp::IggyTimestamp;
-use std::mem;
-
-/// A container that accumulates messages before they are written to disk
-#[derive(Debug, Default)]
-pub struct MessagesAccumulator {
-    /// Base offset of the first message
-    base_offset: u64,
-
-    /// Base timestamp of the first message
-    base_timestamp: u64,
-
-    /// Current size of all accumulated messages
-    current_size: IggyByteSize,
-
-    /// Current maximum offset
-    current_offset: u64,
-
-    /// Current maximum timestamp
-    current_timestamp: u64,
-
-    /// A single IggyMessagesMut instance containing all messages
-    messages: Option<IggyMessagesMut>,
-
-    /// Number of messages in the accumulator
-    messages_count: u32,
-}
-
-impl MessagesAccumulator {
-    /// Coalesces a batch of messages into the accumulator
-    /// This method also prepares the messages for persistence by setting their
-    /// offset, timestamp, record size, and checksum fields
-    /// Returns the number of messages in the batch
-    pub fn coalesce_batch(&mut self, current_offset: u64, mut batch: 
IggyMessagesMut) -> u32 {
-        // TODO(hubcio): add capacity
-        let batch_messages_count = batch.count();
-
-        if batch_messages_count == 0 {
-            return 0;
-        }
-
-        if self.messages.is_none() {
-            self.base_offset = current_offset;
-            self.base_timestamp = IggyTimestamp::now().as_micros();
-            self.current_offset = current_offset;
-            self.current_timestamp = self.base_timestamp;
-        }
-
-        batch.prepare_for_persistence(self.base_offset, 
Some(self.current_timestamp));
-
-        let batch_size = batch.size();
-
-        // Add the batch to our messages
-        if let Some(messages) = &mut self.messages {
-            messages.extend(batch);
-        } else {
-            self.messages = Some(batch);
-            // self.messages.as_mut().unwrap().set_capacity(1_200_000); // 
TODO(hubcio): Add new config and multiply this by k factor
-        }
-
-        self.messages_count += batch_messages_count;
-        self.current_size += IggyByteSize::from(batch_size as u64);
-        self.current_offset = self.base_offset + self.messages_count as u64 - 
1;
-        self.current_timestamp = IggyTimestamp::now().as_micros();
-
-        batch_messages_count
-    }
-
-    /// Checks if the accumulator is empty
-    pub fn is_empty(&self) -> bool {
-        self.messages.is_none() || self.messages_count == 0
-    }
-
-    /// Returns the number of unsaved messages
-    pub fn unsaved_messages_count(&self) -> usize {
-        self.messages_count as usize
-    }
-
-    /// Returns the maximum offset in the accumulator
-    pub fn max_offset(&self) -> u64 {
-        self.current_offset
-    }
-
-    /// Returns the maximum timestamp in the accumulator
-    pub fn batch_max_timestamp(&self) -> u64 {
-        self.current_timestamp
-    }
-
-    /// Returns the base offset of the accumulator
-    pub fn base_offset(&self) -> u64 {
-        self.base_offset
-    }
-
-    /// Returns the base timestamp of the accumulator
-    pub fn batch_base_timestamp(&self) -> u64 {
-        self.base_timestamp
-    }
-
-    /// Takes ownership of the accumulator and returns the messages
-    pub fn materialize(self) -> IggyMessages {
-        self.messages.unwrap().into()
-    }
-}
diff --git a/server/src/streaming/segments/messages_accumulator.rs 
b/server/src/streaming/segments/messages_accumulator.rs
index 4d9ea197..45bb2f35 100644
--- a/server/src/streaming/segments/messages_accumulator.rs
+++ b/server/src/streaming/segments/messages_accumulator.rs
@@ -1,9 +1,8 @@
 use super::{IggyMessagesMut, Index};
-use bytes::{Bytes, BytesMut};
-use iggy::prelude::{IggyMessageView, IggyMessages};
+use crate::streaming::segments::types::IggyMessagesSlice;
+use iggy::prelude::IggyMessages;
 use iggy::utils::byte_size::IggyByteSize;
 use iggy::utils::timestamp::IggyTimestamp;
-use std::mem;
 
 /// A container that accumulates messages before they are written to disk
 #[derive(Debug, Default)]
@@ -23,8 +22,8 @@ pub struct MessagesAccumulator {
     /// Current maximum timestamp
     current_timestamp: u64,
 
-    /// A single buffer containing all accumulated messages
-    messages: Vec<u8>,
+    /// A multiple buffers containing all accumulated messages
+    messages: Vec<IggyMessages>,
 
     /// Indexes
     indexes: Vec<Index>,
@@ -58,18 +57,19 @@ impl MessagesAccumulator {
             self.current_timestamp = self.base_timestamp;
         }
 
+        // TODO(hubcio): pass indexes vec and modify
         batch.prepare_for_persistence(self.base_offset, 
self.current_timestamp);
 
         let batch_size = batch.size();
         let batch = batch.make_immutable();
         let mut current_position = current_position;
         for message in batch.iter() {
-            let msg_size = message.as_ref().unwrap().size() as u32;
+            let msg_size = message.size() as u32;
 
-            let offset = 
message.as_ref().unwrap().msg_header().unwrap().offset();
+            let offset = message.msg_header().offset();
             let relative_offset = (offset - self.base_offset) as u32;
             let position = current_position;
-            let timestamp = 
message.as_ref().unwrap().msg_header().unwrap().timestamp();
+            let timestamp = message.msg_header().timestamp();
 
             self.indexes.push(Index {
                 offset: relative_offset,
@@ -79,9 +79,7 @@ impl MessagesAccumulator {
 
             current_position += msg_size;
         }
-        let messages: Vec<u8> = batch.into_inner().into();
-
-        self.messages.extend(messages);
+        self.messages.push(batch);
 
         self.messages_count += batch_messages_count;
         self.current_size += IggyByteSize::from(batch_size as u64);
@@ -90,35 +88,56 @@ impl MessagesAccumulator {
 
         batch_messages_count
     }
+    pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) 
-> IggyMessagesSlice {
+        let start_idx = (start_offset - self.base_offset) as usize;
+        let end_idx = (end_offset - self.base_offset) as usize; // inclusive
+        if self.indexes.is_empty() || start_idx >= self.indexes.len() {
+            return IggyMessagesSlice::empty();
+        }
+        let start_byte = self.indexes[start_idx].position as usize;
+        let end_byte = if end_idx + 1 < self.indexes.len() {
+            self.indexes[end_idx + 1].position as usize
+        } else {
+            self.current_size.as_bytes_usize()
+        };
 
-    pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) 
-> IggyMessages {
-        let relative_start = start_offset - self.base_offset;
-        let relative_end = end_offset - self.base_offset;
-        let mut pos = 0;
-        let mut msg_index = 0;
-        let mut start_byte = None;
-        let mut end_byte = None;
-        while pos < self.messages.len() {
-            let view = IggyMessageView::new(&self.messages[pos..]).unwrap();
-            if msg_index == relative_start {
-                start_byte = Some(pos);
+        let mut cumulative = Vec::with_capacity(self.messages.len());
+        let mut sum = 0usize;
+        for msg in &self.messages {
+            sum += msg.buffer().len();
+            cumulative.push(sum);
+        }
+
+        let mut buffers = Vec::new();
+        for (i, &cum) in cumulative.iter().enumerate() {
+            let batch_start = if i == 0 { 0 } else { cumulative[i - 1] };
+            let batch_end = cum;
+            if batch_end <= start_byte {
+                continue;
             }
-            pos += view.size();
-            msg_index += 1;
-            if msg_index == relative_end + 1 {
-                end_byte = Some(pos);
+            if batch_start >= end_byte {
                 break;
             }
+            let slice_start = start_byte.saturating_sub(batch_start);
+            let slice_end = if end_byte < batch_end {
+                end_byte - batch_start
+            } else {
+                batch_end - batch_start
+            };
+            buffers.push(
+                self.messages[i]
+                    .shallow_copy()
+                    .slice(slice_start..slice_end),
+            );
         }
-        let start_byte = start_byte.unwrap_or(0);
-        let end_byte = end_byte.unwrap_or(self.messages.len());
-        let slice = &self.messages[start_byte..end_byte];
-        let count = if relative_end >= relative_start {
-            relative_end - relative_start + 1
+
+        let count = if end_idx >= start_idx {
+            (end_idx - start_idx + 1) as u32
         } else {
             0
         };
-        IggyMessages::new(Bytes::copy_from_slice(slice), count as u32)
+
+        IggyMessagesSlice::from_buffers(buffers, count)
     }
 
     /// Checks if the accumulator is empty
@@ -151,10 +170,8 @@ impl MessagesAccumulator {
         self.base_timestamp
     }
 
-    /// Takes ownership of the accumulator and returns the messages
-    pub fn materialize(self) -> (IggyMessages, Vec<Index>) {
-        let messages = IggyMessages::new(Bytes::from(self.messages), 
self.messages_count);
-        let indexes = self.indexes;
-        (messages, indexes)
+    /// Takes ownership of the accumulator and returns the messages and indexes
+    pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) {
+        (self.messages, self.indexes)
     }
 }
diff --git a/server/src/streaming/segments/mod.rs 
b/server/src/streaming/segments/mod.rs
index ffbde13e..524d9507 100644
--- a/server/src/streaming/segments/mod.rs
+++ b/server/src/streaming/segments/mod.rs
@@ -11,6 +11,7 @@ pub use segment::Segment;
 pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;
 pub use types::IggyMessagesMut;
+pub use types::IggyMessagesSlice;
 
 pub const LOG_EXTENSION: &str = "log";
 pub const INDEX_EXTENSION: &str = "index";
diff --git a/server/src/streaming/segments/reading_messages.rs 
b/server/src/streaming/segments/reading_messages.rs
index 379200e6..e680fec4 100644
--- a/server/src/streaming/segments/reading_messages.rs
+++ b/server/src/streaming/segments/reading_messages.rs
@@ -1,6 +1,7 @@
 use super::Index;
 use crate::streaming::segments::segment::Segment;
-use bytes::BytesMut;
+use crate::streaming::segments::types::IggyMessagesSlice;
+use bytes::{Bytes, BytesMut};
 use error_set::ErrContext;
 use iggy::prelude::*;
 use std::sync::Arc;
@@ -20,43 +21,32 @@ impl Segment {
     pub async fn get_messages_by_timestamp(
         &self,
         start_timestamp: u64,
-        count: usize,
-    ) -> Result<Vec<Arc<()>>, IggyError> {
-        todo!();
-        // if count == 0 {
-        //     return Ok(Vec::new());
-        // }
-
-        // let mut messages = Vec::with_capacity(count);
-        // let mut remaining = count;
-
-        // let disk_messages = self
-        //     .load_messages_from_disk_by_timestamp(start_timestamp, 
remaining)
-        //     .await?;
-        // let disk_count = disk_messages.len();
-        // messages.extend(disk_messages);
-        // remaining -= disk_count;
-
-        // if remaining > 0 {
-        //     if let Some(messages_accumulator) = &self.unsaved_messages {
-        //         let buffer_messages =
-        //             
messages_accumulator.get_messages_by_timestamp(start_timestamp, remaining);
-        //         messages.extend(buffer_messages);
-        //     }
-        // }
-
-        // // Ensure we return exactly requested count (truncate if buffer had 
more)
-        // messages.truncate(count);
-        // Ok(messages)
+        count: u32,
+    ) -> Result<IggyMessagesSlice, IggyError> {
+        if count == 0 {
+            return Ok(IggyMessagesSlice::empty());
+        }
+
+        let index_opt = self.load_index_for_timestamp(start_timestamp).await?;
+
+        let Some(index) = index_opt else {
+            trace!("No messages found for timestamp: {}", start_timestamp);
+            return Ok(IggyMessagesSlice::empty());
+        };
+
+        let offset = self.start_offset + index.offset as u64;
+        trace!("Found offset {} for timestamp {}", offset, start_timestamp);
+
+        self.get_messages_by_offset(offset, count).await
     }
 
     pub async fn get_messages_by_offset(
         &self,
         mut offset: u64,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
+    ) -> Result<IggyMessagesSlice, IggyError> {
         if count == 0 {
-            return Ok(IggyMessages::default());
+            return Ok(IggyMessagesSlice::empty());
         }
 
         if offset < self.start_offset {
@@ -74,12 +64,12 @@ impl Segment {
 
         // In case that the partition messages buffer is disabled, we need to 
check the unsaved messages buffer
         if self.unsaved_messages.is_none() {
-            return self.load_messages_from_disk(offset, count).await;
+            return Ok(self.load_messages_from_disk(offset, count).await?);
         }
 
         let messages_accumulator = self.unsaved_messages.as_ref().unwrap();
         if messages_accumulator.is_empty() {
-            return self.load_messages_from_disk(offset, count).await;
+            return Ok(self.load_messages_from_disk(offset, count).await?);
         }
 
         let accumulator_first_msg_offset = messages_accumulator.base_offset();
@@ -92,11 +82,12 @@ impl Segment {
 
         // Case 2: All messages are on disk
         if end_offset < accumulator_first_msg_offset {
-            return self.load_messages_from_disk(offset, count).await;
+            return Ok(self.load_messages_from_disk(offset, count).await?);
         }
 
         // Case 3: Messages span disk and messages_require_to_save buffer 
boundary
-        let mut buffer = BytesMut::new();
+
+        let mut slices = Vec::new();
 
         // Load messages from disk up to the messages_require_to_save buffer 
boundary
         if offset < accumulator_first_msg_offset {
@@ -106,37 +97,22 @@ impl Segment {
             "{COMPONENT} (error: {error}) - failed to load messages from disk, 
stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end 
offset :{}",
             self.stream_id, self.topic_id, self.partition_id, 
accumulator_first_msg_offset - 1
         ))?;
-            buffer.extend_from_slice(disk_messages.buffer());
+            slices.push(disk_messages);
         }
 
         // Load remaining messages from messages_require_to_save buffer
         let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset);
         let buffer_messages = 
self.load_messages_from_unsaved_buffer(buffer_start, end_offset);
-        buffer.extend_from_slice(buffer_messages.buffer());
-
-        let total_count = if buffer.is_empty() {
-            0
-        } else {
-            IggyMessageViewIterator::new(&buffer).count() as u32
-        };
-
-        Ok(IggyMessages::new(buffer.freeze(), total_count))
-    }
+        slices.push(buffer_messages);
 
-    pub async fn get_all_messages(&self) -> Result<IggyMessages, IggyError> {
-        //TODO: Fix me
-        /*
-        self.get_messages_by_offset(self.start_offset, 
self.get_messages_count() as u32)
-            .await
-            */
-        todo!()
+        Ok(IggyMessagesSlice::combine(slices))
     }
 
     fn load_messages_from_unsaved_buffer(
         &self,
         start_offset: u64,
         end_offset: u64,
-    ) -> IggyMessages {
+    ) -> IggyMessagesSlice {
         let messages_accumulator = self.unsaved_messages.as_ref().unwrap();
         messages_accumulator.get_messages_by_offset(start_offset, end_offset)
     }
@@ -145,127 +121,97 @@ impl Segment {
         &self,
         timestamp: u64,
     ) -> Result<Option<Index>, IggyError> {
-        todo!()
-
-        // trace!("Loading index for timestamp: {}", timestamp);
-        // let index = self
-        //     .index_reader
-        //     .as_ref()
-        //     .unwrap()
-        //     .load_index_for_timestamp_impl(timestamp)
-        //     .await
-        //     .with_error_context(|error| {
-        //         format!(
-        //             "Failed to load index for timestamp: {timestamp} for 
{}. {error}",
-        //             self
-        //         )
-        //     })?;
-
-        // trace!("Loaded index: {:?}", index);
-        // Ok(index)
-    }
-
-    async fn load_messages_from_disk_by_timestamp(
-        &self,
-        start_timestamp: u64,
-        count: usize,
-    ) -> Result<Vec<Arc<()>>, IggyError> {
-        //TODO Fix me
-        /*
-        let index = self.load_index_for_timestamp(start_timestamp).await?;
-        let Some(index) = index else {
-            return Ok(Vec::new());
-        };
-
-        let index_range = IndexRange {
-            start: index,
-            end: Index {
-                offset: u32::MAX,
-                position: u32::MAX,
-                timestamp: u64::MAX,
-            },
-        };
-        let batches = self.load_batches_by_range(&index_range).await?;
-
-        let mut messages = Vec::with_capacity(count);
-        for batch in batches {
-            for msg in batch.into_messages_iter() {
-                if msg.timestamp >= start_timestamp {
-                    messages.push(Arc::new(msg));
-                    if messages.len() >= count {
-                        break;
-                    }
-                }
-            }
-            if messages.len() >= count {
-                break;
-            }
+        if timestamp < self.start_timestamp {
+            trace!(
+                "Timestamp {} is earlier than segment start timestamp {}",
+                timestamp,
+                self.start_timestamp
+            );
+            return Ok(Some(Index::default()));
         }
 
-        Ok(messages)
-        */
-        todo!()
-    }
+        if timestamp > self.end_timestamp {
+            trace!(
+                "Timestamp {} is later than segment end timestamp {}",
+                timestamp,
+                self.end_timestamp
+            );
+            return Ok(None);
+        }
 
-    /// Loads and verifies message checksums from the log file.
-    pub async fn load_message_checksums(&self) -> Result<(), IggyError> {
-        //TODO: Fix me
-        /*
-        self.log_reader
+        trace!("Loading index for timestamp: {}", timestamp);
+        let index = self
+            .index_reader
             .as_ref()
             .unwrap()
-            .load_batches_by_range_with_callback(&IndexRange::max_range(), 
|batch| {
-                for message in batch.into_messages_iter() {
-                    let calculated_checksum = 
checksum::calculate(&message.payload);
-                    trace!(
-                        "Loaded message for offset: {}, checksum: {}, 
expected: {}",
-                        message.offset,
-                        calculated_checksum,
-                        message.checksum
-                    );
-                    if calculated_checksum != message.checksum {
-                        return Err(IggyError::InvalidMessageChecksum(
-                            calculated_checksum,
-                            message.checksum,
-                            message.offset,
-                        ));
-                    }
-                }
-                Ok(())
-            })
+            .load_index_for_timestamp_impl(timestamp)
             .await
             .with_error_context(|error| {
-                format!("Failed to load batches by max range for {}. {error}", 
self)
+                format!(
+                    "Failed to load index for timestamp: {timestamp} for {}. 
{error}",
+                    self
+                )
             })?;
-        Ok(())
-        */
-        todo!()
-    }
 
-    /// Loads and returns all message IDs from the log file.
-    pub async fn load_message_ids(&self) -> Result<Vec<u128>, IggyError> {
-        todo!()
+        trace!("Loaded index: {:?}", index);
+        Ok(index)
+    }
 
-        // trace!("Loading message IDs from log file: {}", self.log_path);
-        // let ids = self
-        //     .log_reader
+    /// Loads and verifies message checksums from the log file.
+    pub async fn load_message_checksums(&self) -> Result<(), IggyError> {
+        // self.log_reader
         //     .as_ref()
         //     .unwrap()
-        //     .load_message_ids_impl()
+        //     .load_batches_by_range_with_callback(&IndexRange::max_range(), 
|batch| {
+        //         for message in batch.into_messages_iter() {
+        //             let calculated_checksum = 
checksum::calculate(&message.payload);
+        //             trace!(
+        //                 "Loaded message for offset: {}, checksum: {}, 
expected: {}",
+        //                 message.offset,
+        //                 calculated_checksum,
+        //                 message.checksum
+        //             );
+        //             if calculated_checksum != message.checksum {
+        //                 return Err(IggyError::InvalidMessageChecksum(
+        //                     calculated_checksum,
+        //                     message.checksum,
+        //                     message.offset,
+        //                 ));
+        //             }
+        //         }
+        //         Ok(())
+        //     })
         //     .await
         //     .with_error_context(|error| {
-        //         format!("Failed to load message IDs, error: {error} for 
{self}")
+        //         format!("Failed to load batches by max range for {}. 
{error}", self)
         //     })?;
-        // trace!("Loaded {} message IDs from log file.", ids.len());
-        // Ok(ids)
+        // Ok(())
+
+        todo!()
+    }
+
+    /// Loads and returns all message IDs from the log file.
+    pub async fn load_message_ids(&self) -> Result<Vec<u128>, IggyError> {
+        trace!("Loading message IDs from log file: {}", self.log_path);
+        let ids = self
+            .messages_reader
+            .as_ref()
+            .unwrap()
+            .load_message_ids_impl()
+            .await
+            .with_error_context(|error| {
+                format!("Failed to load message IDs, error: {error} for 
{self}")
+            })?;
+        trace!("Loaded {} message IDs from log file.", ids.len());
+        Ok(ids)
     }
 
     async fn load_messages_from_disk(
         &self,
         start_offset: u64,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
-        trace!(
+    ) -> Result<IggyMessagesSlice, IggyError> {
+        tracing::trace!(
                 "Loading {count} messages from disk, start_offset: 
{start_offset}, current_offset: {}...",
                 self.current_offset
             );
@@ -278,8 +224,13 @@ impl Segment {
             .as_ref()
             .unwrap()
             .load_nth_index(relative_start_offset)
-            .await?
-            .unwrap();
+            .await?;
+
+        if first_index.is_none() {
+            return Ok(IggyMessagesSlice::empty());
+        }
+
+        let first_index = first_index.unwrap();
 
         let last_index = self
             .index_reader
@@ -301,6 +252,14 @@ impl Segment {
             .with_error_context(|error| {
                 format!("Failed to load messages from segment file: {self}. 
{error}")
             })?;
-        Ok(messages)
+        let end = messages.size();
+        let count = messages.count();
+
+        Ok(IggyMessagesSlice::new(
+            messages.into_inner(),
+            0,
+            end as usize,
+            count,
+        ))
     }
 }
diff --git a/server/src/streaming/segments/types/message_header_view_mut.rs 
b/server/src/streaming/segments/types/message_header_view_mut.rs
index 5525a904..d4cfc067 100644
--- a/server/src/streaming/segments/types/message_header_view_mut.rs
+++ b/server/src/streaming/segments/types/message_header_view_mut.rs
@@ -1,4 +1,3 @@
-use iggy::error::IggyError;
 use iggy::prelude::*;
 
 /// A typed, in-place view of a raw header in a buffer
@@ -10,8 +9,8 @@ pub struct IggyMessageHeaderViewMut<'a> {
 
 impl<'a> IggyMessageHeaderViewMut<'a> {
     /// Construct a mutable view over the header slice.
-    pub fn new(data: &'a mut [u8]) -> Result<Self, IggyError> {
-        Ok(Self { data })
+    pub fn new(data: &'a mut [u8]) -> Self {
+        Self { data }
     }
 
     pub fn checksum(&self) -> u64 {
diff --git a/server/src/streaming/segments/types/message_view_mut.rs 
b/server/src/streaming/segments/types/message_view_mut.rs
index 70ea2c2f..e83ae16d 100644
--- a/server/src/streaming/segments/types/message_view_mut.rs
+++ b/server/src/streaming/segments/types/message_view_mut.rs
@@ -9,10 +9,9 @@ use std::ops::Range;
 pub struct IggyMessageViewMut<'a> {
     /// The buffer containing the message
     buffer: &'a mut [u8],
+
     /// Payload offset
     payload_offset: usize,
-    /// User headers offset
-    headers_offset: usize,
 }
 
 impl<'a> IggyMessageViewMut<'a> {
@@ -20,7 +19,7 @@ impl<'a> IggyMessageViewMut<'a> {
     pub fn new(buffer: &'a mut [u8]) -> Result<Self, IggyError> {
         let (payload_len, headers_len) = {
             let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize];
-            let hdr_view = IggyMessageHeaderView::new(hdr_slice)?;
+            let hdr_view = IggyMessageHeaderView::new(hdr_slice);
             (hdr_view.payload_length(), hdr_view.headers_length())
         };
         let total_size = IGGY_MESSAGE_HEADER_SIZE + payload_len + headers_len;
@@ -31,18 +30,17 @@ impl<'a> IggyMessageViewMut<'a> {
         Ok(Self {
             buffer,
             payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize,
-            headers_offset: IGGY_MESSAGE_HEADER_SIZE as usize + payload_len as 
usize,
         })
     }
 
     /// Get an immutable header view
-    pub fn msg_header(&self) -> Result<IggyMessageHeaderView<'_>, IggyError> {
+    pub fn msg_header(&self) -> IggyMessageHeaderView<'_> {
         let hdr_slice = &self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize];
         IggyMessageHeaderView::new(hdr_slice)
     }
 
     /// Get an ephemeral mutable header view for reading/writing
-    pub fn msg_header_mut(&mut self) -> Result<IggyMessageHeaderViewMut<'_>, 
IggyError> {
+    pub fn msg_header_mut(&mut self) -> IggyMessageHeaderViewMut<'_> {
         let hdr_slice = &mut self.buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize];
         IggyMessageHeaderViewMut::new(hdr_slice)
     }
@@ -50,7 +48,7 @@ impl<'a> IggyMessageViewMut<'a> {
     /// Returns the size of the entire message (header + payload + user 
headers).
     pub fn size(&self) -> usize {
         // TODO(hubcio): remove unwraps()
-        let hdr_view = self.msg_header().expect("header must be valid");
+        let hdr_view = self.msg_header();
         (IGGY_MESSAGE_HEADER_SIZE + hdr_view.payload_length() + 
hdr_view.headers_length()) as usize
     }
 
@@ -62,14 +60,14 @@ impl<'a> IggyMessageViewMut<'a> {
 
     /// Convenience to update the checksum field in the header
     pub fn update_checksum(&mut self) {
-        let start = 8; // Skip checksum field for checksum calculation
-        let size = self.size() - 8;
-        let data = &self.buffer[start..start + size];
+        // let start = 8; // Skip checksum field for checksum calculation
+        // let size = self.size() - 8;
+        // let data = &self.buffer[start..start + size];
 
-        let checksum = gxhash64(data, 0);
+        // let checksum = gxhash64(data, 0);
 
-        let mut hdr_view = self.msg_header_mut().expect("header must be 
valid");
-        hdr_view.set_checksum(checksum);
+        // let mut hdr_view = self.msg_header_mut();
+        // hdr_view.set_checksum(checksum);
     }
 }
 
diff --git a/server/src/streaming/segments/types/messages_mut.rs 
b/server/src/streaming/segments/types/messages_mut.rs
index a86d3c24..7fbbca22 100644
--- a/server/src/streaming/segments/types/messages_mut.rs
+++ b/server/src/streaming/segments/types/messages_mut.rs
@@ -10,6 +10,7 @@ pub struct IggyMessagesMut {
     /// The number of messages in the buffer
     #[serde(skip)]
     count: u32,
+
     /// The buffer containing the messages
     buffer: BytesMut,
 }
@@ -48,7 +49,9 @@ impl IggyMessagesMut {
         for message in messages {
             buffer.extend_from_slice(&message.to_bytes());
         }
-        Self::new(buffer)
+        let mut messages_mut = Self::new(buffer);
+        messages_mut.count = messages.len() as u32;
+        messages_mut
     }
 
     /// Extends the messages container with another set of mutable messages by 
consuming them
@@ -96,15 +99,7 @@ impl IggyMessagesMut {
 
     /// Prepares all messages in the batch for persistence by setting their 
offsets, timestamps,
     /// and other necessary fields. This method should be called before the 
messages are written to disk.
-    ///
-    /// # Arguments
-    ///
-    /// * `base_offset` - The offset to start assigning from
-    /// * `timestamp` - The timestamp to use (if None, current timestamp will 
be used)
-    ///
-    /// # Returns
-    ///
-    /// The number of messages processed
+    /// Returns the number of messages processed.
     pub fn prepare_for_persistence(&mut self, base_offset: u64, timestamp: 
u64) -> u32 {
         let mut processed_count = 0;
         let mut current_offset = base_offset;
@@ -112,11 +107,10 @@ impl IggyMessagesMut {
         let mut iterator = self.iter_mut();
         while let Some(message_result) = LendingIterator::next(&mut iterator) {
             if let Ok(mut message) = message_result {
-                // TODO(hubcio): remove unwraps
-                message.msg_header_mut().unwrap().set_offset(current_offset);
+                message.msg_header_mut().set_offset(current_offset);
 
-                if message.msg_header().unwrap().timestamp() == 0 {
-                    message.msg_header_mut().unwrap().set_timestamp(timestamp);
+                if message.msg_header().timestamp() == 0 {
+                    message.msg_header_mut().set_timestamp(timestamp);
                 }
 
                 message.update_checksum();
@@ -133,3 +127,13 @@ impl IggyMessagesMut {
         IggyMessages::new(self.buffer.freeze(), self.count)
     }
 }
+
+impl From<&[IggyMessage]> for IggyMessagesMut {
+    fn from(messages: &[IggyMessage]) -> Self {
+        let messages_size: u32 = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
+            .sum();
+        Self::from_messages(messages, messages_size)
+    }
+}
diff --git a/server/src/streaming/segments/types/messages_slice.rs 
b/server/src/streaming/segments/types/messages_slice.rs
new file mode 100644
index 00000000..740f065b
--- /dev/null
+++ b/server/src/streaming/segments/types/messages_slice.rs
@@ -0,0 +1,70 @@
+use bytes::{Bytes, BytesMut};
+use iggy::prelude::*;
+
+#[derive(Debug, Clone)]
+pub struct IggyMessagesSlice {
+    buffers: Vec<Bytes>,
+    count: u32,
+}
+
+impl IggyMessagesSlice {
+    pub fn new(buffer: Bytes, start: usize, end: usize, count: u32) -> Self {
+        Self {
+            buffers: vec![buffer.slice(start..end)],
+            count,
+        }
+    }
+
+    pub fn from_buffers(buffers: Vec<Bytes>, count: u32) -> Self {
+        Self { buffers, count }
+    }
+
+    pub fn combine(slices: Vec<IggyMessagesSlice>) -> Self {
+        let mut buffers = Vec::new();
+        let mut total_count = 0;
+        for slice in slices {
+            buffers.extend(slice.buffers);
+            total_count += slice.count;
+        }
+        Self {
+            buffers,
+            count: total_count,
+        }
+    }
+
+    pub fn chunks(&self) -> impl Iterator<Item = &[u8]> + '_ {
+        self.buffers.iter().map(|b| b.as_ref())
+    }
+
+    pub fn chunks_count(&self) -> usize {
+        self.buffers.len()
+    }
+
+    pub fn to_messages(&self) -> IggyMessages {
+        let total_size: usize = self.buffers.iter().map(|b| b.len()).sum();
+        let mut combined = BytesMut::with_capacity(total_size);
+        for chunk in &self.buffers {
+            combined.extend_from_slice(chunk);
+        }
+        IggyMessages::new(combined.freeze(), self.count)
+    }
+
+    pub fn count(&self) -> u32 {
+        self.count
+    }
+
+    pub fn size(&self) -> u32 {
+        self.buffers.iter().map(|b| b.len() as u32).sum()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.count == 0 || self.buffers.is_empty()
+    }
+
+    pub fn empty() -> Self {
+        Self {
+            buffers: Vec::new(),
+            count: 0,
+        }
+    }
+}
diff --git a/server/src/streaming/segments/types/messages_slice_better.rs 
b/server/src/streaming/segments/types/messages_slice_better.rs
new file mode 100644
index 00000000..8b137891
--- /dev/null
+++ b/server/src/streaming/segments/types/messages_slice_better.rs
@@ -0,0 +1 @@
+
diff --git a/server/src/streaming/segments/types/mod.rs 
b/server/src/streaming/segments/types/mod.rs
index ecb07a11..016bc7b2 100644
--- a/server/src/streaming/segments/types/mod.rs
+++ b/server/src/streaming/segments/types/mod.rs
@@ -1,7 +1,10 @@
 mod message_header_view_mut;
 mod message_view_mut;
 mod messages_mut;
+mod messages_slice;
+mod messages_slice_better;
 
 pub use message_header_view_mut::IggyMessageHeaderViewMut;
 pub use message_view_mut::IggyMessageViewMut;
 pub use messages_mut::IggyMessagesMut;
+pub use messages_slice::IggyMessagesSlice;
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index bc81192f..f9c3d303 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -106,9 +106,7 @@ impl Segment {
             .unwrap()
             .save_batches(messages, confirmation)
             .await
-            .with_error_context(
-                |error| format!("Failed to save batch for seg: {self}. 
{error}",),
-            )?;
+            .with_error_context(|error| format!("Failed to save batch for 
{self}. {error}",))?;
 
         self.last_index_position += saved_bytes.as_bytes_u64() as u32;
 
diff --git a/server/src/streaming/systems/messages.rs 
b/server/src/streaming/systems/messages.rs
index 77c50111..0eff2f1b 100644
--- a/server/src/streaming/systems/messages.rs
+++ b/server/src/streaming/systems/messages.rs
@@ -1,4 +1,4 @@
-use crate::streaming::segments::IggyMessagesMut;
+use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice};
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::System;
 use crate::streaming::systems::COMPONENT;
@@ -19,7 +19,7 @@ impl System {
         topic_id: &Identifier,
         partition_id: Option<u32>,
         args: PollingArgs,
-    ) -> Result<IggyMessages, IggyError> {
+    ) -> Result<IggyMessagesSlice, IggyError> {
         self.ensure_authenticated(session)?;
         if args.count == 0 {
             return Err(IggyError::InvalidMessagesCount);
@@ -60,57 +60,46 @@ impl System {
             .get_messages(polling_consumer, partition_id, args.strategy, 
args.count)
             .await?;
 
-        // TODO: Fix me
-        /*
-        if polled_messages.messages.is_empty() {
-            return Ok(polled_messages);
-        }
-        */
+        return Ok(result);
 
-        // TODO: Fix me
-        /*
-        let offset = polled_messages.messages.last().unwrap().offset;
-        if args.auto_commit {
-            trace!("Last offset: {} will be automatically stored for {}, 
stream: {}, topic: {}, partition: {}", offset, consumer, stream_id, topic_id, 
partition_id);
-            topic
-                .store_consumer_offset_internal(polling_consumer, offset, 
partition_id)
-                .await
-                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: {}, 
offset: {}, partition ID: {}", polling_consumer, offset, partition_id)) ?;
-        }
-        */
+        // let offset = polled_messages.messages.last().unwrap().offset;
+        // if args.auto_commit {
+        //     trace!("Last offset: {} will be automatically stored for {}, 
stream: {}, topic: {}, partition: {}", offset, consumer, stream_id, topic_id, 
partition_id);
+        //     topic
+        //         .store_consumer_offset_internal(polling_consumer, offset, 
partition_id)
+        //         .await
+        //         .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to store consumer offset internal, polling consumer: {}, 
offset: {}, partition ID: {}", polling_consumer, offset, partition_id)) ?;
+        // }
 
-        if self.encryptor.is_none() {
-            return Ok(result);
-        }
+        // if self.encryptor.is_none() {
+        //     return Ok(result);
+        // }
         // TODO: Fix me
-        /*
-        let encryptor = self.encryptor.as_ref().unwrap();
-        let mut decrypted_messages = 
Vec::with_capacity(polled_messages.messages.len());
-        for message in polled_messages.messages.iter() {
-            let payload = encryptor.decrypt(&message.payload);
-            match payload {
-                Ok(payload) => {
-                    decrypted_messages.push(PolledMessage {
-                        id: message.id,
-                        state: message.state,
-                        offset: message.offset,
-                        timestamp: message.timestamp,
-                        checksum: message.checksum,
-                        length: IggyByteSize::from(payload.len() as u64),
-                        payload: Bytes::from(payload),
-                        headers: message.headers.clone(),
-                    });
-                }
-                Err(error) => {
-                    error!("Cannot decrypt the message. Error: {}", error);
-                    return Err(IggyError::CannotDecryptData);
-                }
-            }
-        }
-        polled_messages.messages = decrypted_messages;
-        Ok(polled_messages)
-        */
-        todo!()
+        // let encryptor = self.encryptor.as_ref().unwrap();
+        // let mut decrypted_messages = 
Vec::with_capacity(polled_messages.messages.len());
+        // for message in polled_messages.messages.iter() {
+        //     let payload = encryptor.decrypt(&message.payload);
+        //     match payload {
+        //         Ok(payload) => {
+        //             decrypted_messages.push(PolledMessage {
+        //                 id: message.id,
+        //                 state: message.state,
+        //                 offset: message.offset,
+        //                 timestamp: message.timestamp,
+        //                 checksum: message.checksum,
+        //                 length: IggyByteSize::from(payload.len() as u64),
+        //                 payload: Bytes::from(payload),
+        //                 headers: message.headers.clone(),
+        //             });
+        //         }
+        //         Err(error) => {
+        //             error!("Cannot decrypt the message. Error: {}", error);
+        //             return Err(IggyError::CannotDecryptData);
+        //         }
+        //     }
+        // }
+        // polled_messages.messages = decrypted_messages;
+        // Ok(polled_messages)
     }
 
     pub async fn append_messages(
diff --git a/server/src/streaming/systems/stats.rs 
b/server/src/streaming/systems/stats.rs
index 5f2cb819..ba268b4c 100644
--- a/server/src/streaming/systems/stats.rs
+++ b/server/src/streaming/systems/stats.rs
@@ -3,9 +3,8 @@ use crate::versioning::SemanticVersion;
 use crate::VERSION;
 use iggy::error::IggyError;
 use iggy::locking::IggySharedMutFn;
-use iggy::models::stats::{CacheMetricsKey, Stats};
+use iggy::models::stats::Stats;
 use iggy::utils::duration::IggyDuration;
-use std::collections::HashMap;
 use std::sync::OnceLock;
 use sysinfo::{Pid, ProcessesToUpdate, System as SysinfoSystem};
 use tokio::sync::Mutex;
@@ -38,21 +37,6 @@ impl System {
         let kernel_version =
             
sysinfo::System::kernel_version().unwrap_or("unknown_kernel_version".to_string());
 
-        let mut cache_metrics = HashMap::new();
-        for stream in self.streams.values() {
-            for topic in stream.topics.values() {
-                for partition in topic.partitions.values() {
-                    let key = CacheMetricsKey {
-                        stream_id: stream.stream_id,
-                        topic_id: topic.topic_id,
-                        partition_id: partition.read().await.partition_id,
-                    };
-                    let metrics = partition.read().await.get_cache_metrics();
-                    cache_metrics.insert(key, metrics);
-                }
-            }
-        }
-
         let mut stats = Stats {
             process_id,
             total_cpu_usage,
@@ -67,7 +51,6 @@ impl System {
             iggy_server_semver: SemanticVersion::current()
                 .ok()
                 .and_then(|v| v.get_numeric_version().ok()),
-            cache_metrics,
             ..Default::default()
         };
 
diff --git a/server/src/streaming/topics/messages.rs 
b/server/src/streaming/topics/messages.rs
index 4c377788..8da9adaf 100644
--- a/server/src/streaming/topics/messages.rs
+++ b/server/src/streaming/topics/messages.rs
@@ -1,5 +1,5 @@
 use crate::streaming::polling_consumer::PollingConsumer;
-use crate::streaming::segments::IggyMessagesMut;
+use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice};
 use crate::streaming::topics::topic::Topic;
 use crate::streaming::topics::COMPONENT;
 use crate::streaming::utils::file::folder_size;
@@ -31,7 +31,7 @@ impl Topic {
         partition_id: u32,
         strategy: PollingStrategy,
         count: u32,
-    ) -> Result<IggyMessages, IggyError> {
+    ) -> Result<IggyMessagesSlice, IggyError> {
         if !self.has_partitions() {
             return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
         }

Reply via email to