This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 13dee5d7affdb8910d3f0816ec9c95e60f2ce3f9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 11 23:23:37 2025 +0100

    more improvements
---
 bench/src/actors/consumer.rs                       |   2 +-
 configs/server.toml                                |   7 +
 integration/tests/streaming/get_by_offset.rs       |  20 +--
 sdk/src/binary/messages.rs                         |   9 +-
 sdk/src/messages/send_messages.rs                  |   4 +-
 sdk/src/models/messaging/message.rs                |  51 +++++++
 sdk/src/models/messaging/message_view.rs           |   2 +-
 sdk/src/models/messaging/messages.rs               | 148 ---------------------
 sdk/src/models/messaging/mod.rs                    |   5 +-
 sdk/src/prelude.rs                                 |   2 +-
 .../handlers/messages/poll_messages_handler.rs     |  15 ++-
 .../handlers/messages/send_messages_handler.rs     |   4 +-
 server/src/streaming/partitions/messages.rs        |  30 ++---
 server/src/streaming/partitions/storage.rs         |   7 +-
 .../src/streaming/segments/indexes/index_reader.rs |   4 +-
 .../src/streaming/segments/indexes/index_writer.rs |   4 +-
 server/src/streaming/segments/indexes/mod.rs       |   4 +-
 .../streaming/segments/messages/messages_reader.rs |  21 +--
 .../streaming/segments/messages/messages_writer.rs |  49 ++++---
 server/src/streaming/segments/messages/mod.rs      |  46 +++----
 .../streaming/segments/messages/persister_task.rs  | 131 +++++++++---------
 .../src/streaming/segments/messages_accumulator.rs | 107 +++------------
 server/src/streaming/segments/mod.rs               |   3 +-
 server/src/streaming/segments/reading_messages.rs  | 105 ++++++---------
 server/src/streaming/segments/segment.rs           |  57 ++++----
 .../streaming/segments/types/message_view_mut.rs   |  70 +++++-----
 server/src/streaming/segments/types/messages.rs    |  57 ++++++++
 .../src/streaming/segments/types/messages_batch.rs | 109 +++++++++++++++
 .../src/streaming/segments/types/messages_mut.rs   | 129 ++++++++----------
 .../src/streaming/segments/types/messages_slice.rs |  70 ----------
 .../segments/types/messages_slice_better.rs        |   1 -
 server/src/streaming/segments/types/mod.rs         |   7 +-
 server/src/streaming/segments/writing_messages.rs  |  67 +++-------
 server/src/streaming/systems/messages.rs           |   6 +-
 server/src/streaming/topics/messages.rs            | 122 +----------------
 server/src/streaming/topics/storage.rs             |   6 -
 36 files changed, 608 insertions(+), 873 deletions(-)

diff --git a/bench/src/actors/consumer.rs b/bench/src/actors/consumer.rs
index e83b34d8..0c451009 100644
--- a/bench/src/actors/consumer.rs
+++ b/bench/src/actors/consumer.rs
@@ -27,7 +27,7 @@ pub struct Consumer {
     client_factory: Arc<dyn ClientFactory>,
     benchmark_kind: BenchmarkKind,
     consumer_id: u32,
-    consumer_group_id: Option<u32>,
+    consumer_group_id: Option<u32>, 
     stream_id: u32,
     messages_per_batch: u32,
     message_batches: u32,
diff --git a/configs/server.toml b/configs/server.toml
index 470c64bb..076aea1b 100644
--- a/configs/server.toml
+++ b/configs/server.toml
@@ -447,8 +447,15 @@ validate_checksum = false
 # The threshold of buffered messages before triggering a save to disk 
(integer).
 # Specifies how many messages accumulate before persisting to storage.
 # Adjusting this can balance between write performance and data durability.
+# Together with `size_of_messages_required_to_save` it defines the threshold 
of buffered messages.
 messages_required_to_save = 1000
 
+# The threshold of buffered messages size before triggering a save to disk 
(string).
+# Specifies how much messages accumulate before persisting to storage.
+# Adjusting this can balance between write performance and data durability.
+# Together with `messages_required_to_save` it defines the threshold of 
buffered messages.
+size_of_messages_required_to_save = "1 MB"
+
 # Segment configuration
 [system.segment]
 # Defines the soft limit for the size of a storage segment.
diff --git a/integration/tests/streaming/get_by_offset.rs 
b/integration/tests/streaming/get_by_offset.rs
index bfaec404..5c1f8cb7 100644
--- a/integration/tests/streaming/get_by_offset.rs
+++ b/integration/tests/streaming/get_by_offset.rs
@@ -168,11 +168,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        all_loaded_messages.count(),
+        all_loaded_messages.messages_count(),
         total_messages,
         "Expected {} messages from start, but got {}",
         total_messages,
-        all_loaded_messages.count()
+        all_loaded_messages.messages_count()
     );
 
     // Test 2: Get messages from middle (after 3rd batch)
@@ -183,11 +183,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        middle_messages.count(),
+        middle_messages.messages_count(),
         remaining_messages,
         "Expected {} messages from middle offset, but got {}",
         remaining_messages,
-        middle_messages.count()
+        middle_messages.messages_count()
     );
 
     // Test 3: No messages beyond final offset
@@ -197,10 +197,10 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        no_messages.count(),
+        no_messages.messages_count(),
         0,
         "Expected no messages beyond final offset, but got {}",
-        no_messages.count()
+        no_messages.messages_count()
     );
 
     // Test 4: Small subset from start
@@ -210,11 +210,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        subset_messages.count(),
+        subset_messages.messages_count(),
         subset_size,
         "Expected {} messages in subset from start, but got {}",
         subset_size,
-        subset_messages.count()
+        subset_messages.messages_count()
     );
 
     // Test 5: Messages spanning multiple batches
@@ -225,11 +225,11 @@ async fn test_get_messages_by_offset(
         .await
         .unwrap();
     assert_eq!(
-        messages.count(),
+        messages.messages_count(),
         span_size,
         "Expected {} messages spanning multiple batches, but got {}",
         span_size,
-        messages.count()
+        messages.messages_count()
     );
 
     // Test 6: Validate message content and ordering
diff --git a/sdk/src/binary/messages.rs b/sdk/src/binary/messages.rs
index cb1d66ae..b9d9709b 100644
--- a/sdk/src/binary/messages.rs
+++ b/sdk/src/binary/messages.rs
@@ -7,7 +7,7 @@ use crate::error::IggyError;
 use crate::identifier::Identifier;
 use crate::messages::{FlushUnsavedBuffer, PollingStrategy};
 use crate::models::messaging::IggyMessage;
-use crate::prelude::{BytesSerializable, IggyMessages, Partitioning, 
PollMessages, SendMessages};
+use crate::prelude::{Partitioning, PollMessages, SendMessages};
 
 #[async_trait::async_trait]
 impl<B: BinaryClient> MessageClient for B {
@@ -36,7 +36,12 @@ impl<B: BinaryClient> MessageClient for B {
                 ),
             )
             .await?;
-        Ok(IggyMessages::from_bytes(response).unwrap().to_messages())
+
+        // First 4 bytes contain count of received messages
+        let count = 
u32::from_le_bytes(response.slice(0..4).as_ref().try_into().unwrap());
+        let messages = response.slice(4..);
+
+        Ok(IggyMessage::from_raw_bytes(messages, count))
     }
 
     async fn send_messages(
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 04dd294c..30ecfaf1 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -5,8 +5,8 @@ use crate::error::IggyError;
 use crate::identifier::Identifier;
 use crate::messages::{MAX_HEADERS_SIZE, MAX_PAYLOAD_SIZE};
 use crate::models::messaging::{HeaderKey, HeaderValue};
-use crate::models::messaging::{IggyMessage, IggyMessageHeader, 
IggyMessageViewIterator};
-use crate::prelude::IGGY_MESSAGE_HEADER_SIZE;
+use crate::models::messaging::{IggyMessage, IggyMessageHeader};
+use crate::prelude::{IggyMessageViewIterator, IGGY_MESSAGE_HEADER_SIZE};
 use crate::utils::byte_size::IggyByteSize;
 use crate::utils::sizeable::Sizeable;
 use crate::utils::varint::IggyVarInt;
diff --git a/sdk/src/models/messaging/message.rs 
b/sdk/src/models/messaging/message.rs
index d60344fe..e85fb13e 100644
--- a/sdk/src/models/messaging/message.rs
+++ b/sdk/src/models/messaging/message.rs
@@ -37,6 +37,57 @@ impl IggyMessage {
     pub fn builder() -> IggyMessageBuilder {
         IggyMessageBuilder::new()
     }
+
+    /// Convert bytes to messages
+    pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> 
Vec<IggyMessage> {
+        let mut messages = Vec::with_capacity(count as usize);
+        let mut position = 0;
+        let buf_len = buffer.len();
+
+        while position < buf_len {
+            if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
+                break;
+            }
+            let header_bytes = buffer.slice(position..position + 
IGGY_MESSAGE_HEADER_SIZE as usize);
+            let header = match IggyMessageHeader::from_bytes(header_bytes) {
+                Ok(h) => h,
+                Err(_) => break,
+            };
+            position += IGGY_MESSAGE_HEADER_SIZE as usize;
+
+            let payload_end = position + header.payload_length as usize;
+            if payload_end > buf_len {
+                break;
+            }
+            let payload = buffer.slice(position..payload_end);
+            position = payload_end;
+
+            let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> 
=
+                if header.headers_length > 0 {
+                    let headers_end = position + header.headers_length as 
usize;
+                    if headers_end > buf_len {
+                        break;
+                    }
+                    let headers_bytes = buffer.slice(position..headers_end);
+                    position = headers_end;
+
+                    match HashMap::from_bytes(headers_bytes) {
+                        Ok(map) => Some(map),
+                        Err(_) => break,
+                    }
+                } else {
+                    None
+                };
+
+            messages.push(IggyMessage {
+                header,
+                payload,
+                headers,
+            });
+        }
+
+        messages
+    }
 }
 
 impl FromStr for IggyMessage {
diff --git a/sdk/src/models/messaging/message_view.rs 
b/sdk/src/models/messaging/message_view.rs
index 40b20f77..ba51d1f4 100644
--- a/sdk/src/models/messaging/message_view.rs
+++ b/sdk/src/models/messaging/message_view.rs
@@ -1,5 +1,5 @@
 use super::message_header::*;
-use super::IggyMessageHeaderView;
+use super::message_header_view::IggyMessageHeaderView;
 use crate::error::IggyError;
 use crate::models::messaging::header::HeaderKey;
 use crate::models::messaging::header::HeaderValue;
diff --git a/sdk/src/models/messaging/messages.rs 
b/sdk/src/models/messaging/messages.rs
deleted file mode 100644
index cb2f4a54..00000000
--- a/sdk/src/models/messaging/messages.rs
+++ /dev/null
@@ -1,148 +0,0 @@
-use std::collections::HashMap;
-
-use super::{
-    message_view::IggyMessageViewIterator, IggyMessage, IggyMessageHeader, 
IGGY_MESSAGE_HEADER_SIZE,
-};
-use crate::bytes_serializable::BytesSerializable;
-use crate::error::IggyError;
-use crate::utils::byte_size::IggyByteSize;
-use crate::utils::sizeable::Sizeable;
-use bytes::{Bytes, BytesMut};
-use serde::{Deserialize, Serialize};
-
-/// An immutable messages container that holds a buffer of messages
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
-pub struct IggyMessages {
-    /// The number of messages in the buffer
-    #[serde(skip)]
-    count: u32,
-    /// The buffer containing the messages
-    buffer: Bytes,
-}
-
-impl IggyMessages {
-    /// Create a new messages container from a buffer
-    pub fn new(buffer: Bytes, count: u32) -> Self {
-        Self { buffer, count }
-    }
-
-    /// Creates a empty messages container with capacity
-    pub fn with_capacity(capacity: u32) -> Self {
-        Self::new(BytesMut::with_capacity(capacity as usize).freeze(), 0)
-    }
-
-    /// Create iterator over messages
-    pub fn iter(&self) -> IggyMessageViewIterator {
-        IggyMessageViewIterator::new(&self.buffer)
-    }
-
-    /// Get the number of messages
-    pub fn count(&self) -> u32 {
-        self.count
-    }
-
-    /// Get the total size of all messages in bytes
-    pub fn size(&self) -> u32 {
-        self.buffer.len() as u32
-    }
-
-    /// Get access to the underlying buffer
-    pub fn buffer(&self) -> &[u8] {
-        &self.buffer
-    }
-
-    /// Get access to the underlying buffer shallow  copy
-    pub fn shallow_copy(&self) -> Bytes {
-        self.buffer.clone()
-    }
-
-    pub fn into_inner(self) -> Bytes {
-        self.buffer
-    }
-
-    pub fn to_messages(self) -> Vec<IggyMessage> {
-        let mut messages = Vec::with_capacity(self.count as usize);
-        let mut position = 0;
-        let buf_len = self.buffer.len();
-
-        while position < buf_len {
-            if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
-                break;
-            }
-            let header_bytes = self
-                .buffer
-                .slice(position..position + IGGY_MESSAGE_HEADER_SIZE as usize);
-            let header = match IggyMessageHeader::from_bytes(header_bytes) {
-                Ok(h) => h,
-                Err(_) => break,
-            };
-            position += IGGY_MESSAGE_HEADER_SIZE as usize;
-
-            let payload_end = position + header.payload_length as usize;
-            if payload_end > buf_len {
-                break;
-            }
-            let payload = self.buffer.slice(position..payload_end);
-            position = payload_end;
-
-            let headers: Option<HashMap<super::HeaderKey, super::HeaderValue>> 
= if header.headers_length > 0 {
-                let headers_end = position + header.headers_length as usize;
-                if headers_end > buf_len {
-                    break;
-                }
-                let headers_bytes = self.buffer.slice(position..headers_end);
-                position = headers_end;
-
-                match HashMap::from_bytes(headers_bytes) {
-                    Ok(map) => Some(map),
-                    Err(_) => break,
-                }
-            } else {
-                None
-            };
-
-            messages.push(IggyMessage {
-                header,
-                payload,
-                headers,
-            });
-        }
-
-        messages
-    }
-}
-
-impl BytesSerializable for IggyMessages {
-    fn to_bytes(&self) -> Bytes {
-        self.buffer.clone()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        let mut messages_count = 0;
-        let iterator: IggyMessageViewIterator<'_> = 
IggyMessageViewIterator::new(&bytes);
-
-        for _ in iterator {
-            messages_count += 1;
-        }
-
-        Ok(Self {
-            buffer: bytes,
-            count: messages_count,
-        })
-    }
-}
-
-impl Sizeable for IggyMessages {
-    fn get_size_bytes(&self) -> IggyByteSize {
-        IggyByteSize::from(self.buffer.len() as u64)
-    }
-}
-
-impl Default for IggyMessages {
-    fn default() -> Self {
-        Self::new(BytesMut::new().freeze(), 0)
-    }
-}
diff --git a/sdk/src/models/messaging/mod.rs b/sdk/src/models/messaging/mod.rs
index 2605a4af..df0fa89c 100644
--- a/sdk/src/models/messaging/mod.rs
+++ b/sdk/src/models/messaging/mod.rs
@@ -3,7 +3,6 @@ mod message;
 mod message_header;
 mod message_header_view;
 mod message_view;
-mod messages;
 
 pub use header::{HeaderKey, HeaderKind, HeaderValue};
 pub use message::IggyMessage;
@@ -15,5 +14,5 @@ pub use message_header::{
     IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE,
 };
 pub use message_header_view::IggyMessageHeaderView;
-pub use message_view::{IggyMessageView, IggyMessageViewIterator};
-pub use messages::IggyMessages;
+pub use message_view::IggyMessageView;
+pub use message_view::IggyMessageViewIterator;
diff --git a/sdk/src/prelude.rs b/sdk/src/prelude.rs
index 9172ead2..ef4354a7 100644
--- a/sdk/src/prelude.rs
+++ b/sdk/src/prelude.rs
@@ -20,7 +20,7 @@ pub use crate::messages::{
 };
 pub use crate::models::messaging::{
     HeaderKey, HeaderValue, IggyMessage, IggyMessageHeader, 
IggyMessageHeaderView, IggyMessageView,
-    IggyMessageViewIterator, IggyMessages,
+    IggyMessageViewIterator,
 };
 pub use crate::models::messaging::{
     IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, 
IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE,
diff --git a/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/server/src/binary/handlers/messages/poll_messages_handler.rs
index 85527060..c8a3bfde 100644
--- a/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -26,7 +26,7 @@ impl ServerCommandHandler for PollMessages {
         debug!("session: {session}, command: {self}");
 
         let system = system.read().await;
-        let messages = system
+        let batches = system
             .poll_messages(
                 session,
                 &self.consumer,
@@ -40,16 +40,23 @@ impl ServerCommandHandler for PollMessages {
                 "{COMPONENT} (error: {error}) - failed to poll messages for 
consumer: {}, stream_id: {}, topic_id: {}, partition_id: {:?}, session: 
{session}.",
                 self.consumer, self.stream_id, self.topic_id, self.partition_id
             ))?;
+        drop(system);
 
         // Collect all chunks first into a Vec to extend their lifetimes.
         // This ensures the Arc<[u8]> references from each ByteSliceView stay 
alive
         // throughout the async vectored I/O operation, preventing "borrowed 
value does not live
         // long enough" errors while optimizing transmission by using larger 
chunks.
 
-        let length = messages.size().to_le_bytes();
-        let slices: Vec<IoSlice> = 
messages.chunks().map(IoSlice::new).collect();
+        let response_length = (batches.size() + 4).to_le_bytes();
+        let messages_count = batches.messages_count().to_le_bytes();
 
-        sender.send_ok_response_vectored(&length, slices).await?;
+        let mut io_slices = Vec::with_capacity(batches.containers_count() + 1);
+        io_slices.push(IoSlice::new(&messages_count));
+        io_slices.extend(batches.iter().map(|m| IoSlice::new(m)));
+
+        sender
+            .send_ok_response_vectored(&response_length, io_slices)
+            .await?;
         Ok(())
     }
 }
diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs 
b/server/src/binary/handlers/messages/send_messages_handler.rs
index 60d9735a..aac97eb8 100644
--- a/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -53,8 +53,8 @@ impl ServerCommandHandler for SendMessages {
 
         let messages_count = buffer.get_u32_le();
         let messages = buffer.split();
-        let mut messages = IggyMessagesMut::new(messages);
-        messages.set_count(messages_count);
+
+        let messages = IggyMessagesMut::from_bytes(messages, messages_count);
 
         let system = system.read().await;
         system
diff --git a/server/src/streaming/partitions/messages.rs 
b/server/src/streaming/partitions/messages.rs
index d4e6fc1f..51819388 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -66,7 +66,7 @@ impl Partition {
         &self,
         start_offset: u64,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessagesBatch, IggyError> {
         trace!(
             "Getting messages for start offset: {start_offset} for partition: 
{}, current offset: {}...",
             self.partition_id,
@@ -80,6 +80,7 @@ impl Partition {
         */
 
         let end_offset = self.get_end_offset(start_offset, count);
+
         // TODO: Most likely don't need to find the specific range of 
segments, just find the first segment containing the first offset
         // and during reads roll to the next one, when the first is exhausted.
         let segments = self.filter_segments_by_offsets(start_offset, 
end_offset);
@@ -93,12 +94,12 @@ impl Partition {
     }
 
     // Retrieves the first messages (up to a specified count).
-    pub async fn get_first_messages(&self, count: u32) -> 
Result<IggyMessagesSlice, IggyError> {
+    pub async fn get_first_messages(&self, count: u32) -> 
Result<IggyMessagesBatch, IggyError> {
         self.get_messages_by_offset(0, count).await
     }
 
     // Retrieves the last messages (up to a specified count).
-    pub async fn get_last_messages(&self, count: u32) -> 
Result<IggyMessagesSlice, IggyError> {
+    pub async fn get_last_messages(&self, count: u32) -> 
Result<IggyMessagesBatch, IggyError> {
         let mut requested_count = count as u64;
         if requested_count > self.current_offset + 1 {
             requested_count = self.current_offset + 1
@@ -113,7 +114,7 @@ impl Partition {
         &self,
         consumer: PollingConsumer,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessagesBatch, IggyError> {
         let (consumer_offsets, consumer_id) = match consumer {
             PollingConsumer::Consumer(consumer_id, _) => 
(&self.consumer_offsets, consumer_id),
             PollingConsumer::ConsumerGroup(group_id, _) => 
(&self.consumer_group_offsets, group_id),
@@ -181,11 +182,10 @@ impl Partition {
         segments: Vec<&Segment>,
         offset: u64,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
-        let mut slices = Vec::new();
+    ) -> Result<IggyMessagesBatch, IggyError> {
         let mut remaining_count = count;
         let mut current_offset = offset;
-
+        let mut batches = IggyMessagesBatch::new();
         for segment in segments {
             if remaining_count == 0 {
                 break;
@@ -203,7 +203,7 @@ impl Partition {
                 })?;
 
             // Update remaining count and offset for next segment
-            let messages_count = messages.count();
+            let messages_count = messages.messages_count();
             remaining_count = remaining_count.saturating_sub(messages_count);
 
             // Update the offset for the next segment if needed
@@ -213,12 +213,10 @@ impl Partition {
                 current_offset += messages_count as u64;
             }
 
-            slices.push(messages);
+            batches.add_batch(messages);
         }
 
-        // Combine all segment slices into a single composite slice
-        // This avoids copying the data
-        Ok(IggyMessagesSlice::combine(slices))
+        Ok(batches)
         // let mut results = Vec::new();
         // let mut remaining_count = count;
         // for segment in segments {
@@ -252,9 +250,9 @@ impl Partition {
     ) -> Result<(), IggyError> {
         trace!(
             "Appending {} messages of size {} to partition with ID: {}...",
-            self.partition_id,
             messages.count(),
-            messages.size()
+            messages.size(),
+            self.partition_id
         );
         {
             let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
@@ -373,9 +371,7 @@ impl Partition {
             self.partition_id
         );
 
-        // Make sure all of the messages from the accumulator are persisted
-        // no leftover from one round trip.
-        while last_segment.unsaved_messages.is_some() {
+        if !last_segment.accumulator.is_empty() {
             last_segment.persist_messages(None).await.unwrap();
         }
         self.unsaved_messages_count = 0;
diff --git a/server/src/streaming/partitions/storage.rs 
b/server/src/streaming/partitions/storage.rs
index a251be85..573c9937 100644
--- a/server/src/streaming/partitions/storage.rs
+++ b/server/src/streaming/partitions/storage.rs
@@ -133,9 +133,10 @@ impl PartitionStorage for FilePartitionStorage {
                 format!("{COMPONENT} (error: {error}) - failed to load 
segment: {segment}",)
             })?;
             let capacity = 
partition.config.partition.messages_required_to_save;
-            if !segment.is_closed {
-                segment.unsaved_messages = Some(Default::default());
-            }
+            // TODO(hubcio)
+            // if !segment.is_closed {
+            //     segment.unsaved_messages = Some(Default::default());
+            // }
 
             // If the first segment has at least a single message, we should 
increment the offset.
             if !partition.should_increment_offset {
diff --git a/server/src/streaming/segments/indexes/index_reader.rs 
b/server/src/streaming/segments/indexes/index_reader.rs
index 83a8eb3b..87d79ad3 100644
--- a/server/src/streaming/segments/indexes/index_reader.rs
+++ b/server/src/streaming/segments/indexes/index_reader.rs
@@ -15,13 +15,13 @@ use tracing::{error, trace};
 
 /// A dedicated struct for reading from the index file.
 #[derive(Debug)]
-pub struct SegmentIndexReader {
+pub struct IndexReader {
     file_path: String,
     file: Arc<File>,
     index_size_bytes: Arc<AtomicU64>,
 }
 
-impl SegmentIndexReader {
+impl IndexReader {
     /// Opens the index file in read-only mode.
     pub async fn new(file_path: &str, index_size_bytes: Arc<AtomicU64>) -> 
Result<Self, IggyError> {
         let file = OpenOptions::new()
diff --git a/server/src/streaming/segments/indexes/index_writer.rs 
b/server/src/streaming/segments/indexes/index_writer.rs
index 306e7350..7d6df825 100644
--- a/server/src/streaming/segments/indexes/index_writer.rs
+++ b/server/src/streaming/segments/indexes/index_writer.rs
@@ -13,14 +13,14 @@ use tracing::trace;
 
 /// A dedicated struct for writing to the index file.
 #[derive(Debug)]
-pub struct SegmentIndexWriter {
+pub struct IndexWriter {
     file_path: String,
     file: File,
     index_size_bytes: Arc<AtomicU64>,
     fsync: bool,
 }
 
-impl SegmentIndexWriter {
+impl IndexWriter {
     /// Opens the index file in write mode.
     pub async fn new(
         file_path: &str,
diff --git a/server/src/streaming/segments/indexes/mod.rs 
b/server/src/streaming/segments/indexes/mod.rs
index 5ad84180..2a9308c5 100644
--- a/server/src/streaming/segments/indexes/mod.rs
+++ b/server/src/streaming/segments/indexes/mod.rs
@@ -6,5 +6,5 @@ mod index_writer;
 pub const INDEX_SIZE: u32 = 16;
 
 pub use index::Index;
-pub use index_reader::SegmentIndexReader;
-pub use index_writer::SegmentIndexWriter;
+pub use index_reader::IndexReader;
+pub use index_writer::IndexWriter;
diff --git a/server/src/streaming/segments/messages/messages_reader.rs 
b/server/src/streaming/segments/messages/messages_reader.rs
index caff84d2..060d4235 100644
--- a/server/src/streaming/segments/messages/messages_reader.rs
+++ b/server/src/streaming/segments/messages/messages_reader.rs
@@ -1,11 +1,8 @@
+use crate::streaming::segments::IggyMessages;
 use bytes::{Bytes, BytesMut};
 use error_set::ErrContext;
-use iggy::{
-    error::IggyError,
-    prelude::{BytesSerializable, IggyMessages},
-};
+use iggy::error::IggyError;
 use std::{
-    fmt,
     fs::{File, OpenOptions},
     os::unix::prelude::FileExt,
 };
@@ -17,7 +14,7 @@ use std::{
     },
 };
 use tokio::task::spawn_blocking;
-use tracing::{error, trace, warn};
+use tracing::{error, trace};
 
 /// A dedicated struct for reading from the log file.
 #[derive(Debug)]
@@ -115,12 +112,16 @@ impl MessagesReader {
         let file_size = self.file_size();
         if file_size == 0 {
             trace!("Messages file {} is empty.", self.file_path);
-            return Ok(IggyMessages::default());
+            return Ok(IggyMessages::empty());
         }
-        let messages_bytes = match self.read_at(start_pos as u64, count_bytes 
as u64).await {
+
+        let messages_bytes = match self
+            .read_bytes_at(start_pos as u64, count_bytes as u64)
+            .await
+        {
             Ok(buf) => buf,
             Err(error) if error.kind() == ErrorKind::UnexpectedEof => {
-                return Ok(IggyMessages::default())
+                return Ok(IggyMessages::empty())
             }
             Err(error) => {
                 error!(
@@ -138,7 +139,7 @@ impl MessagesReader {
         self.log_size_bytes.load(Ordering::Acquire)
     }
 
-    async fn read_at(&self, offset: u64, len: u64) -> Result<Bytes, 
std::io::Error> {
+    async fn read_bytes_at(&self, offset: u64, len: u64) -> Result<Bytes, 
std::io::Error> {
         let file = self.file.clone();
         spawn_blocking(move || {
             let mut buf = BytesMut::with_capacity(len as usize);
diff --git a/server/src/streaming/segments/messages/messages_writer.rs 
b/server/src/streaming/segments/messages/messages_writer.rs
index bc46040b..5fff02fe 100644
--- a/server/src/streaming/segments/messages/messages_writer.rs
+++ b/server/src/streaming/segments/messages/messages_writer.rs
@@ -1,22 +1,16 @@
 use super::PersisterTask;
+use crate::streaming::segments::{messages::write_batch, IggyMessagesBatch};
 use error_set::ErrContext;
 use iggy::{
     confirmation::Confirmation,
     error::IggyError,
-    prelude::IggyMessages,
     utils::{byte_size::IggyByteSize, duration::IggyDuration},
 };
-use std::{
-    io::IoSlice,
-    sync::{
-        atomic::{AtomicU64, Ordering},
-        Arc,
-    },
-};
-use tokio::{
-    fs::{File, OpenOptions},
-    io::AsyncWriteExt,
+use std::sync::{
+    atomic::{AtomicU64, Ordering},
+    Arc,
 };
+use tokio::fs::{File, OpenOptions};
 use tracing::{error, trace};
 
 /// A dedicated struct for writing to the messages file.
@@ -94,17 +88,31 @@ impl MessagesWriter {
     /// Append a messages to the messages file.
     pub async fn save_batches(
         &mut self,
-        messages: Vec<IggyMessages>,
+        batches: IggyMessagesBatch,
         confirmation: Confirmation,
     ) -> Result<IggyByteSize, IggyError> {
-        let messages_size: usize = messages.iter().map(|m| m.size() as 
usize).sum();
+        let messages_size = batches.size();
         trace!(
             "Saving batch of size {messages_size} bytes to messages file: {}",
             self.file_path
         );
         match confirmation {
             Confirmation::Wait => {
-                self.write_batch_vectored(messages).await?;
+                if let Some(ref mut file) = self.file {
+                    write_batch(file, &self.file_path, batches)
+                        .await
+                        .with_error_context(|error| {
+                            format!(
+                                "Failed to write batch to messages file: {}. 
{error}",
+                                self.file_path
+                            )
+                        })
+                        .map_err(|_| IggyError::CannotWriteToFile)?;
+                } else {
+                    error!("File handle is not available for synchronous 
write.");
+                    return Err(IggyError::CannotWriteToFile);
+                }
+
                 self.messages_size_bytes
                     .fetch_add(messages_size as u64, Ordering::AcqRel);
                 trace!(
@@ -117,7 +125,7 @@ impl MessagesWriter {
             }
             Confirmation::NoWait => {
                 if let Some(task) = &self.persister_task {
-                    task.persist(messages).await;
+                    task.persist(batches).await;
                 } else {
                     panic!(
                         "Confirmation::NoWait is used, but 
MessagesPersisterTask is not set for messages file: {}",
@@ -130,17 +138,6 @@ impl MessagesWriter {
         Ok(IggyByteSize::from(messages_size as u64))
     }
 
-    /// Write a batch of bytes to the log file.
-    async fn write_batch_vectored(&mut self, batches: Vec<IggyMessages>) -> 
Result<(), IggyError> {
-        if let Some(ref mut file) = self.file {
-            super::write_batch_vectored(file, &self.file_path, batches).await?;
-            Ok(())
-        } else {
-            error!("File handle is not available for synchronous write.");
-            Err(IggyError::CannotWriteToFile)
-        }
-    }
-
     pub async fn fsync(&self) -> Result<(), IggyError> {
         if let Some(file) = self.file.as_ref() {
             file.sync_all()
diff --git a/server/src/streaming/segments/messages/mod.rs 
b/server/src/streaming/segments/messages/mod.rs
index e4bff590..eaf74a44 100644
--- a/server/src/streaming/segments/messages/mod.rs
+++ b/server/src/streaming/segments/messages/mod.rs
@@ -2,48 +2,36 @@ mod messages_reader;
 mod messages_writer;
 mod persister_task;
 
+use std::io::IoSlice;
+
+use error_set::ErrContext;
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 pub use persister_task::PersisterTask;
 
-use error_set::ErrContext;
-use iggy::{error::IggyError, prelude::IggyMessages};
-use std::io::IoSlice;
+use super::IggyMessagesBatch;
+use iggy::error::IggyError;
 use tokio::{fs::File, io::AsyncWriteExt};
-use tracing::error;
 
-/// Write a batch of messages to a file using vectored I/O
-///
-/// This function writes all the messages in the batch to the file using 
vectored I/O,
-/// which can be more efficient than writing each message individually.
-pub async fn write_batch_vectored(
+/// Vectored write a batches of messages to file
+async fn write_batch(
     file: &mut File,
     file_path: &str,
-    batches: Vec<IggyMessages>,
-) -> Result<u32, IggyError> {
-    let mut total_bytes_written = 0;
-    let mut slices_vec: Vec<IoSlice<'_>> =
-        batches.iter().map(|b| IoSlice::new(b.buffer())).collect();
-
-    let mut slices = slices_vec.as_mut_slice();
+    batches: IggyMessagesBatch,
+) -> Result<usize, IggyError> {
+    let mut slices: Vec<IoSlice> = batches.iter().map(|b| 
IoSlice::new(b)).collect();
 
+    let slices = &mut slices.as_mut_slice();
+    let mut written = 0;
     while !slices.is_empty() {
-        let written = file
+        written += file
             .write_vectored(slices)
             .await
             .with_error_context(|error| {
-                format!("Failed to write vectored to file: {file_path}. 
{error}")
+                format!("Failed to write messages to file: {file_path}, error: 
{error}",)
             })
-            .map_err(|_| IggyError::CannotWriteToFile)? as u32;
-
-        if written == 0 {
-            error!("Failed to write batch of messages to file: {file_path}");
-            return Err(IggyError::CannotWriteToFile);
-        }
-
-        total_bytes_written += written;
-        IoSlice::advance_slices(&mut slices, written as usize);
+            .map_err(|_| IggyError::CannotWriteToFile)?;
+        IoSlice::advance_slices(slices, written);
     }
-
-    Ok(total_bytes_written)
+    Ok(written)
 }
diff --git a/server/src/streaming/segments/messages/persister_task.rs 
b/server/src/streaming/segments/messages/persister_task.rs
index f76000e8..9072a1e7 100644
--- a/server/src/streaming/segments/messages/persister_task.rs
+++ b/server/src/streaming/segments/messages/persister_task.rs
@@ -1,5 +1,7 @@
+use crate::streaming::segments::IggyMessagesBatch;
+use error_set::ErrContext;
 use flume::{unbounded, Receiver};
-use iggy::{error::IggyError, prelude::IggyMessages, 
utils::duration::IggyDuration};
+use iggy::{error::IggyError, utils::duration::IggyDuration};
 use std::{
     io::IoSlice,
     sync::{
@@ -11,10 +13,12 @@ use std::{
 use tokio::{fs::File, io::AsyncWriteExt, select, time::sleep};
 use tracing::{error, trace, warn};
 
+use super::write_batch;
+
 #[derive(Debug)]
 /// A command to the persister task.
 enum PersisterTaskCommand {
-    WriteRequest(Vec<IggyMessages>),
+    WriteRequest(IggyMessagesBatch),
     Shutdown,
 }
 
@@ -59,7 +63,7 @@ impl PersisterTask {
     }
 
     /// Sends the batch bytes to the persister task (fire-and-forget).
-    pub async fn persist(&self, messages: Vec<IggyMessages>) {
+    pub async fn persist(&self, messages: IggyMessagesBatch) {
         if let Err(e) = self
             .sender
             .send_async(PersisterTaskCommand::WriteRequest(messages))
@@ -173,16 +177,7 @@ impl PersisterTask {
         while let Ok(request) = receiver.recv_async().await {
             match request {
                 PersisterTaskCommand::WriteRequest(messages) => {
-                    match Self::write_with_retries(
-                        &mut file,
-                        &file_path,
-                        messages,
-                        fsync,
-                        max_retries,
-                        retry_delay,
-                    )
-                    .await
-                    {
+                    match write_batch(&mut file, &file_path, messages).await {
                         Ok(bytes_written) => {
                             log_file_size.fetch_add(bytes_written as u64, 
Ordering::AcqRel);
                         }
@@ -209,59 +204,61 @@ impl PersisterTask {
         trace!("PersisterTask for file {file_path} has finished processing 
requests");
     }
 
-    /// Writes the provided data to the file using simple retry logic.
-    async fn write_with_retries(
-        file: &mut File,
-        file_path: &str,
-        messages: Vec<IggyMessages>,
-        fsync: bool,
-        max_retries: u32,
-        retry_delay: IggyDuration,
-    ) -> Result<u32, IggyError> {
-        let mut attempts = 0;
-        // TODO: This "retry" logic should be rewritten.
-        // There are certain kind of errors whom retrying is considered 
harmful, for example fsync failure
-        // (https://www.usenix.org/system/files/atc20-rebello.pdf)
-        // There are few errors which are worth retrying such as LSE (Latent 
sector error), as the file system
-        // might be able to reallocate a new sector for the data and recover, 
but not every file system supports that.
-        // In general this topic should be furthered researched rather than 
just naively retry when the write fails.
-        super::write_batch_vectored(file, file_path, messages).await
+    // /// Writes the provided data to the file using simple retry logic.
+    // async fn write_with_retries(
+    //     file: &mut File,
+    //     file_path: &str,
+    //     batches: IggyMessagesBatch,
+    //     fsync: bool,
+    //     max_retries: u32,
+    //     retry_delay: IggyDuration,
+    // ) -> Result<u32, IggyError> {
+    //     let mut attempts = 0;
+    //     // TODO: This "retry" logic should be rewritten.
+    //     // There are certain kind of errors whom retrying is considered 
harmful, for example fsync failure
+    //     // (https://www.usenix.org/system/files/atc20-rebello.pdf)
+    //     // There are few errors which are worth retrying such as LSE 
(Latent sector error), as the file system
+    //     // might be able to reallocate a new sector for the data and 
recover, but not every file system supports that.
+    //     // In general this topic should be furthered researched rather than 
just naively retry when the write fails.
+    //     let mut slices = Vec::new();
+    //     batches.iter().map(|b| {
+    //         slices.push(IoSlice::new(&b));
+    //     });
 
-        // let messages_size = messages.iter().map(|m| m.size()).sum();
-        // loop {
-        //     // TODO: use vectored API
-        //     match super::write_batch_vectored(file, file_path, 
messages).await {
-        //         Ok(_) => {
-        //             if fsync {
-        //                 match file.sync_all().await {
-        //                     Ok(_) => return Ok(messages_size),
-        //                     Err(e) => {
-        //                         attempts += 1;
-        //                         error!(
-        //                             "Error syncing file {file_path}: {:?} 
(attempt {attempts}/{max_retries})",
-        //                              e,
-        //                         );
-        //                     }
-        //                 }
-        //             } else {
-        //                 return Ok(messages_size);
-        //             }
-        //         }
-        //         Err(e) => {
-        //             attempts += 1;
-        //             error!(
-        //                 "Error writing to file {file_path}: {:?} (attempt 
{attempts}/{max_retries})",
-        //                 e,
-        //             );
-        //         }
-        //     }
-        //     if attempts >= max_retries {
-        //         error!(
-        //             "Failed to write to file {file_path} after 
{max_retries} attempts, something's terribly wrong",
-        //         );
-        //         return Err(IggyError::CannotWriteToFile);
-        //     }
-        //     sleep(retry_delay.get_duration()).await;
-        // }
-    }
+    //     let messages_size = batches.size();
+    //     loop {
+    //         match file.write_vectored(&slices).await {
+    //             Ok(written) => {
+    //                 if fsync {
+    //                     match file.sync_all().await {
+    //                         Ok(_) => return Ok(messages_size),
+    //                         Err(e) => {
+    //                             attempts += 1;
+    //                             error!(
+    //                                 "Error syncing file {file_path}: {:?} 
(attempt {attempts}/{max_retries})",
+    //                                  e,
+    //                             );
+    //                         }
+    //                     }
+    //                 } else {
+    //                     return Ok(messages_size);
+    //                 }
+    //             }
+    //             Err(e) => {
+    //                 attempts += 1;
+    //                 error!(
+    //                     "Error writing to file {file_path}: {:?} (attempt 
{attempts}/{max_retries})",
+    //                     e,
+    //                 );
+    //             }
+    //         }
+    //         if attempts >= max_retries {
+    //             error!(
+    //                 "Failed to write to file {file_path} after 
{max_retries} attempts, something's terribly wrong",
+    //             );
+    //             return Err(IggyError::CannotWriteToFile);
+    //         }
+    //         sleep(retry_delay.get_duration()).await;
+    //     }
+    // }
 }
diff --git a/server/src/streaming/segments/messages_accumulator.rs 
b/server/src/streaming/segments/messages_accumulator.rs
index 45bb2f35..37e1775e 100644
--- a/server/src/streaming/segments/messages_accumulator.rs
+++ b/server/src/streaming/segments/messages_accumulator.rs
@@ -1,8 +1,7 @@
-use super::{IggyMessagesMut, Index};
-use crate::streaming::segments::types::IggyMessagesSlice;
-use iggy::prelude::IggyMessages;
+use super::{IggyMessages, IggyMessagesBatch, IggyMessagesMut, Index};
 use iggy::utils::byte_size::IggyByteSize;
 use iggy::utils::timestamp::IggyTimestamp;
+use std::mem::take;
 
 /// A container that accumulates messages before they are written to disk
 #[derive(Debug, Default)]
@@ -22,11 +21,8 @@ pub struct MessagesAccumulator {
     /// Current maximum timestamp
     current_timestamp: u64,
 
-    /// A multiple buffers containing all accumulated messages
-    messages: Vec<IggyMessages>,
-
-    /// Indexes
-    indexes: Vec<Index>,
+    /// A buffer containing all accumulated messages
+    batches: IggyMessagesBatch,
 
     /// Number of messages in the accumulator
     messages_count: u32,
@@ -41,45 +37,31 @@ impl MessagesAccumulator {
         &mut self,
         current_offset: u64,
         current_position: u32,
-        mut batch: IggyMessagesMut,
+        indexes: &mut Vec<Index>,
+        messages: IggyMessagesMut,
     ) -> u32 {
-        // TODO(hubcio): add capacity
-        let batch_messages_count = batch.count();
+        let batch_messages_count = messages.count();
 
         if batch_messages_count == 0 {
             return 0;
         }
 
-        if self.messages.is_empty() {
+        if self.batches.is_empty() {
             self.base_offset = current_offset;
             self.base_timestamp = IggyTimestamp::now().as_micros();
             self.current_offset = current_offset;
             self.current_timestamp = self.base_timestamp;
         }
 
-        // TODO(hubcio): pass indexes vec and modify
-        batch.prepare_for_persistence(self.base_offset, 
self.current_timestamp);
-
+        let batch = messages.prepare_for_persistence(
+            self.base_offset,
+            self.current_timestamp,
+            current_position,
+            indexes,
+        );
         let batch_size = batch.size();
-        let batch = batch.make_immutable();
-        let mut current_position = current_position;
-        for message in batch.iter() {
-            let msg_size = message.size() as u32;
-
-            let offset = message.msg_header().offset();
-            let relative_offset = (offset - self.base_offset) as u32;
-            let position = current_position;
-            let timestamp = message.msg_header().timestamp();
-
-            self.indexes.push(Index {
-                offset: relative_offset,
-                position,
-                timestamp,
-            });
-
-            current_position += msg_size;
-        }
-        self.messages.push(batch);
+
+        self.batches.add(batch);
 
         self.messages_count += batch_messages_count;
         self.current_size += IggyByteSize::from(batch_size as u64);
@@ -88,61 +70,14 @@ impl MessagesAccumulator {
 
         batch_messages_count
     }
-    pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) 
-> IggyMessagesSlice {
-        let start_idx = (start_offset - self.base_offset) as usize;
-        let end_idx = (end_offset - self.base_offset) as usize; // inclusive
-        if self.indexes.is_empty() || start_idx >= self.indexes.len() {
-            return IggyMessagesSlice::empty();
-        }
-        let start_byte = self.indexes[start_idx].position as usize;
-        let end_byte = if end_idx + 1 < self.indexes.len() {
-            self.indexes[end_idx + 1].position as usize
-        } else {
-            self.current_size.as_bytes_usize()
-        };
-
-        let mut cumulative = Vec::with_capacity(self.messages.len());
-        let mut sum = 0usize;
-        for msg in &self.messages {
-            sum += msg.buffer().len();
-            cumulative.push(sum);
-        }
-
-        let mut buffers = Vec::new();
-        for (i, &cum) in cumulative.iter().enumerate() {
-            let batch_start = if i == 0 { 0 } else { cumulative[i - 1] };
-            let batch_end = cum;
-            if batch_end <= start_byte {
-                continue;
-            }
-            if batch_start >= end_byte {
-                break;
-            }
-            let slice_start = start_byte.saturating_sub(batch_start);
-            let slice_end = if end_byte < batch_end {
-                end_byte - batch_start
-            } else {
-                batch_end - batch_start
-            };
-            buffers.push(
-                self.messages[i]
-                    .shallow_copy()
-                    .slice(slice_start..slice_end),
-            );
-        }
-
-        let count = if end_idx >= start_idx {
-            (end_idx - start_idx + 1) as u32
-        } else {
-            0
-        };
 
-        IggyMessagesSlice::from_buffers(buffers, count)
+    pub fn get_messages_by_offset(&self, start_offset: u64, end_offset: u64) 
-> IggyMessages {
+        todo!();
     }
 
     /// Checks if the accumulator is empty
     pub fn is_empty(&self) -> bool {
-        self.messages.is_empty() || self.messages_count == 0
+        self.batches.is_empty() || self.messages_count == 0
     }
 
     /// Returns the number of unsaved messages
@@ -171,7 +106,7 @@ impl MessagesAccumulator {
     }
 
     /// Takes ownership of the accumulator and returns the messages and indexes
-    pub fn materialize(self) -> (Vec<IggyMessages>, Vec<Index>) {
-        (self.messages, self.indexes)
+    pub fn materialize(self) -> IggyMessagesBatch {
+        self.batches
     }
 }
diff --git a/server/src/streaming/segments/mod.rs 
b/server/src/streaming/segments/mod.rs
index 524d9507..2ea22d04 100644
--- a/server/src/streaming/segments/mod.rs
+++ b/server/src/streaming/segments/mod.rs
@@ -10,8 +10,9 @@ pub use indexes::Index;
 pub use segment::Segment;
 pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;
+pub use types::IggyMessages;
+pub use types::IggyMessagesBatch;
 pub use types::IggyMessagesMut;
-pub use types::IggyMessagesSlice;
 
 pub const LOG_EXTENSION: &str = "log";
 pub const INDEX_EXTENSION: &str = "index";
diff --git a/server/src/streaming/segments/reading_messages.rs 
b/server/src/streaming/segments/reading_messages.rs
index e680fec4..c230e62c 100644
--- a/server/src/streaming/segments/reading_messages.rs
+++ b/server/src/streaming/segments/reading_messages.rs
@@ -1,10 +1,7 @@
-use super::Index;
+use super::{IggyMessages, IggyMessagesBatch, Index};
 use crate::streaming::segments::segment::Segment;
-use crate::streaming::segments::types::IggyMessagesSlice;
-use bytes::{Bytes, BytesMut};
 use error_set::ErrContext;
 use iggy::prelude::*;
-use std::sync::Arc;
 use tracing::trace;
 
 const COMPONENT: &str = "STREAMING_SEGMENT";
@@ -22,31 +19,33 @@ impl Segment {
         &self,
         start_timestamp: u64,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
-        if count == 0 {
-            return Ok(IggyMessagesSlice::empty());
-        }
+    ) -> Result<IggyMessagesBatch, IggyError> {
+        todo!()
+
+        // if count == 0 {
+        //     return Ok(IggyMessagesMut::empty());
+        // }
 
-        let index_opt = self.load_index_for_timestamp(start_timestamp).await?;
+        // let index_opt = 
self.load_index_for_timestamp(start_timestamp).await?;
 
-        let Some(index) = index_opt else {
-            trace!("No messages found for timestamp: {}", start_timestamp);
-            return Ok(IggyMessagesSlice::empty());
-        };
+        // let Some(index) = index_opt else {
+        //     trace!("No messages found for timestamp: {}", start_timestamp);
+        //     return Ok(IggyMessagesMut::empty());
+        // };
 
-        let offset = self.start_offset + index.offset as u64;
-        trace!("Found offset {} for timestamp {}", offset, start_timestamp);
+        // let offset = self.start_offset + index.offset as u64;
+        // trace!("Found offset {} for timestamp {}", offset, start_timestamp);
 
-        self.get_messages_by_offset(offset, count).await
+        // self.get_messages_by_offset(offset, count).await
     }
 
     pub async fn get_messages_by_offset(
         &self,
         mut offset: u64,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessagesBatch, IggyError> {
         if count == 0 {
-            return Ok(IggyMessagesSlice::empty());
+            return Ok(IggyMessagesBatch::default());
         }
 
         if offset < self.start_offset {
@@ -62,59 +61,49 @@ impl Segment {
             offset = self.start_offset;
         }
 
-        // In case that the partition messages buffer is disabled, we need to 
check the unsaved messages buffer
-        if self.unsaved_messages.is_none() {
-            return Ok(self.load_messages_from_disk(offset, count).await?);
+        if self.accumulator.is_empty() {
+            return Ok(IggyMessagesBatch::from(
+                self.load_messages_from_disk(offset, count).await?,
+            ));
         }
 
-        let messages_accumulator = self.unsaved_messages.as_ref().unwrap();
-        if messages_accumulator.is_empty() {
-            return Ok(self.load_messages_from_disk(offset, count).await?);
-        }
-
-        let accumulator_first_msg_offset = messages_accumulator.base_offset();
-        let accumulator_last_msg_offset = messages_accumulator.max_offset();
+        let accumulator_first_msg_offset = self.accumulator.base_offset();
+        let accumulator_last_msg_offset = self.accumulator.max_offset();
 
         // Case 1: All messages are in messages_require_to_save buffer
         if offset >= accumulator_first_msg_offset && end_offset <= 
accumulator_last_msg_offset {
-            return Ok(self.load_messages_from_unsaved_buffer(offset, 
end_offset));
+            return Ok(IggyMessagesBatch::from(
+                self.accumulator.get_messages_by_offset(offset, end_offset),
+            ));
         }
 
         // Case 2: All messages are on disk
         if end_offset < accumulator_first_msg_offset {
-            return Ok(self.load_messages_from_disk(offset, count).await?);
+            return Ok(IggyMessagesBatch::from(
+                self.load_messages_from_disk(offset, count).await?,
+            ));
         }
 
         // Case 3: Messages span disk and messages_require_to_save buffer 
boundary
 
-        let mut slices = Vec::new();
-
         // Load messages from disk up to the messages_require_to_save buffer 
boundary
-        if offset < accumulator_first_msg_offset {
-            let disk_messages = self
+        let disk_messages = self
                 .load_messages_from_disk(offset, (accumulator_first_msg_offset 
- offset) as u32)
                 .await.with_error_context(|error| format!(
             "{COMPONENT} (error: {error}) - failed to load messages from disk, 
stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end 
offset :{}",
             self.stream_id, self.topic_id, self.partition_id, 
accumulator_first_msg_offset - 1
         ))?;
-            slices.push(disk_messages);
-        }
 
         // Load remaining messages from messages_require_to_save buffer
         let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset);
-        let buffer_messages = 
self.load_messages_from_unsaved_buffer(buffer_start, end_offset);
-        slices.push(buffer_messages);
-
-        Ok(IggyMessagesSlice::combine(slices))
-    }
-
-    fn load_messages_from_unsaved_buffer(
-        &self,
-        start_offset: u64,
-        end_offset: u64,
-    ) -> IggyMessagesSlice {
-        let messages_accumulator = self.unsaved_messages.as_ref().unwrap();
-        messages_accumulator.get_messages_by_offset(start_offset, end_offset)
+        let buffer_messages = self
+            .accumulator
+            .get_messages_by_offset(buffer_start, end_offset);
+
+        Ok(IggyMessagesBatch::from(vec![
+            disk_messages,
+            buffer_messages,
+        ]))
     }
 
     pub async fn load_index_for_timestamp(
@@ -210,7 +199,7 @@ impl Segment {
         &self,
         start_offset: u64,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessages, IggyError> {
         tracing::trace!(
                 "Loading {count} messages from disk, start_offset: 
{start_offset}, current_offset: {}...",
                 self.current_offset
@@ -227,7 +216,7 @@ impl Segment {
             .await?;
 
         if first_index.is_none() {
-            return Ok(IggyMessagesSlice::empty());
+            return Ok(IggyMessages::empty());
         }
 
         let first_index = first_index.unwrap();
@@ -243,23 +232,13 @@ impl Segment {
         let start_pos = first_index.position;
         let count_bytes = last_index.position - start_pos;
 
-        let messages = self
-            .messages_reader
+        self.messages_reader
             .as_ref()
             .unwrap()
             .load_messages_impl(start_pos, count_bytes, count)
             .await
             .with_error_context(|error| {
                 format!("Failed to load messages from segment file: {self}. 
{error}")
-            })?;
-        let end = messages.size();
-        let count = messages.count();
-
-        Ok(IggyMessagesSlice::new(
-            messages.into_inner(),
-            0,
-            end as usize,
-            count,
-        ))
+            })
     }
 }
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index 30886fc1..dcf5a3f9 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -37,12 +37,12 @@ pub struct Segment {
     pub is_closed: bool,
     pub(super) messages_writer: Option<MessagesWriter>,
     pub(super) messages_reader: Option<MessagesReader>,
-    pub(super) index_writer: Option<SegmentIndexWriter>,
-    pub(super) index_reader: Option<SegmentIndexReader>,
+    pub(super) index_writer: Option<IndexWriter>,
+    pub(super) index_reader: Option<IndexReader>,
     pub message_expiry: IggyExpiry,
-    pub unsaved_messages: Option<MessagesAccumulator>,
+    pub accumulator: MessagesAccumulator,
     pub config: Arc<SystemConfig>,
-    pub indexes: Option<Vec<Index>>,
+    pub indexes: Vec<Index>,
     pub(super) log_size_bytes: Arc<AtomicU64>,
     pub(super) index_size_bytes: Arc<AtomicU64>,
 }
@@ -70,10 +70,10 @@ impl Segment {
             IggyExpiry::ServerDefault => config.segment.message_expiry,
             _ => message_expiry,
         };
-        let indexes = match config.segment.cache_indexes {
-            true => Some(Vec::new()),
-            false => None,
-        };
+        // let indexes = match config.segment.cache_indexes {
+        //     true => Some(Vec::new()),
+        //     false => None,
+        // };
 
         Segment {
             stream_id,
@@ -90,8 +90,8 @@ impl Segment {
             last_index_position: 0,
             max_size_bytes: config.segment.size,
             message_expiry,
-            indexes,
-            unsaved_messages: None,
+            indexes: Vec::new(), // TODO add capacity
+            accumulator: MessagesAccumulator::default(),
             is_closed: false,
             messages_writer: None,
             messages_reader: None,
@@ -128,35 +128,30 @@ impl Segment {
         self.size_bytes = IggyByteSize::from(log_size_bytes);
         self.last_index_position = log_size_bytes as _;
 
-        self.indexes = Some(
-            self.index_reader
-                .as_ref()
-                .unwrap()
-                .load_all_indexes_impl()
-                .await
-                .with_error_context(|error| format!("Failed to load indexes 
for {self}. {error}"))
-                .map_err(|_| IggyError::CannotReadFile)?,
-        );
+        self.indexes = self
+            .index_reader
+            .as_ref()
+            .unwrap()
+            .load_all_indexes_impl()
+            .await
+            .with_error_context(|error| format!("Failed to load indexes for 
{self}. {error}"))
+            .map_err(|_| IggyError::CannotReadFile)?;
 
-        let last_index_offset = if self.indexes.as_ref().unwrap().is_empty() {
+        let last_index_offset = if self.indexes.is_empty() {
             0_u64
         } else {
-            self.indexes.as_ref().unwrap().last().unwrap().offset as u64
+            self.indexes.last().unwrap().offset as u64
         };
 
         self.current_offset = self.start_offset + last_index_offset;
 
         info!("Loaded {} indexes for segment with start offset: {} and 
partition with ID: {} for topic with ID: {} and stream with ID: {}.",
-              self.indexes.as_ref().unwrap().len(),
+              self.indexes.len(),
               self.start_offset,
               self.partition_id,
               self.topic_id,
               self.stream_id);
 
-        if !self.config.segment.cache_indexes {
-            self.indexes = None;
-        }
-
         if self.is_full().await {
             self.is_closed = true;
         }
@@ -221,8 +216,7 @@ impl Segment {
         .await?;
 
         let index_writer =
-            SegmentIndexWriter::new(&self.index_path, 
self.index_size_bytes.clone(), index_fsync)
-                .await?;
+            IndexWriter::new(&self.index_path, self.index_size_bytes.clone(), 
index_fsync).await?;
 
         self.messages_writer = Some(log_writer);
         self.index_writer = Some(index_writer);
@@ -233,7 +227,7 @@ impl Segment {
         let log_reader = MessagesReader::new(&self.log_path, 
self.log_size_bytes.clone()).await?;
         // TODO(hubcio): there is no need to store open fd for reader if we 
have index cache enabled
         let index_reader =
-            SegmentIndexReader::new(&self.index_path, 
self.index_size_bytes.clone()).await?;
+            IndexReader::new(&self.index_path, 
self.index_size_bytes.clone()).await?;
 
         self.messages_reader = Some(log_reader);
         self.index_reader = Some(index_reader);
@@ -434,8 +428,7 @@ mod tests {
         assert_eq!(segment.log_path, log_path);
         assert_eq!(segment.index_path, index_path);
         assert_eq!(segment.message_expiry, message_expiry);
-        assert!(segment.unsaved_messages.is_none());
-        assert!(segment.indexes.is_some());
+        assert!(segment.indexes.is_empty());
         assert!(!segment.is_closed);
         assert!(!segment.is_full().await);
     }
@@ -476,6 +469,6 @@ mod tests {
             messages_count_of_parent_partition,
         );
 
-        assert!(segment.indexes.is_none());
+        assert!(segment.indexes.is_empty());
     }
 }
diff --git a/server/src/streaming/segments/types/message_view_mut.rs 
b/server/src/streaming/segments/types/message_view_mut.rs
index e83ae16d..4b363a76 100644
--- a/server/src/streaming/segments/types/message_view_mut.rs
+++ b/server/src/streaming/segments/types/message_view_mut.rs
@@ -9,28 +9,22 @@ use std::ops::Range;
 pub struct IggyMessageViewMut<'a> {
     /// The buffer containing the message
     buffer: &'a mut [u8],
-
     /// Payload offset
     payload_offset: usize,
 }
 
 impl<'a> IggyMessageViewMut<'a> {
     /// Create a new mutable message view from a buffer
-    pub fn new(buffer: &'a mut [u8]) -> Result<Self, IggyError> {
+    pub fn new(buffer: &'a mut [u8]) -> Self {
         let (payload_len, headers_len) = {
             let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize];
             let hdr_view = IggyMessageHeaderView::new(hdr_slice);
             (hdr_view.payload_length(), hdr_view.headers_length())
         };
-        let total_size = IGGY_MESSAGE_HEADER_SIZE + payload_len + headers_len;
-        if buffer.len() < total_size as usize {
-            return Err(IggyError::InvalidMessagePayloadLength);
-        }
-
-        Ok(Self {
+        Self {
             buffer,
             payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize,
-        })
+        }
     }
 
     /// Get an immutable header view
@@ -60,14 +54,14 @@ impl<'a> IggyMessageViewMut<'a> {
 
     /// Convenience to update the checksum field in the header
     pub fn update_checksum(&mut self) {
-        // let start = 8; // Skip checksum field for checksum calculation
-        // let size = self.size() - 8;
-        // let data = &self.buffer[start..start + size];
+        let start = 8; // Skip checksum field for checksum calculation
+        let size = self.size() - 8;
+        let data = &self.buffer[start..start + size];
 
-        // let checksum = gxhash64(data, 0);
+        let checksum = gxhash64(data, 0);
 
-        // let mut hdr_view = self.msg_header_mut();
-        // hdr_view.set_checksum(checksum);
+        let mut hdr_view = self.msg_header_mut();
+        hdr_view.set_checksum(checksum);
     }
 }
 
@@ -88,30 +82,40 @@ impl<'a> IggyMessageViewMutIterator<'a> {
 
 #[gat]
 impl LendingIterator for IggyMessageViewMutIterator<'_> {
-    type Item<'next> = Result<IggyMessageViewMut<'next>, IggyError>;
+    type Item<'next> = IggyMessageViewMut<'next>;
 
     fn next(&mut self) -> Option<Self::Item<'_>> {
-        if self.position >= self.buffer.len() {
+        let buffer_len = self.buffer.len();
+        if self.position >= buffer_len {
+            return None;
+        }
+
+        // Make sure we have enough bytes for at least a header
+        if self.position + IGGY_MESSAGE_HEADER_SIZE as usize > 
self.buffer.len() {
+            tracing::error!(
+                "Buffer too small for message header at position {}, buffer 
len: {}",
+                self.position,
+                self.buffer.len()
+            );
+            self.position = self.buffer.len(); // Move to end to prevent 
infinite loops
             return None;
         }
 
         let buffer_slice = &mut self.buffer[self.position..];
-        let result = IggyMessageViewMut::new(buffer_slice);
-
-        match result {
-            Ok(view) => {
-                self.position += view.size();
-                Some(Ok(view))
-            }
-            Err(e) => {
-                tracing::error!(
-                    "Failed to create message view at position {}, error: {}",
-                    self.position,
-                    e
-                );
-                self.position += 1;
-                Some(Err(e))
-            }
+        let view = IggyMessageViewMut::new(buffer_slice);
+
+        // Safety check: Make sure we're advancing
+        let message_size = view.size();
+        if message_size == 0 {
+            tracing::error!(
+                "Message size is 0 at position {}, preventing infinite loop",
+                self.position
+            );
+            self.position = buffer_len; // Move to end to prevent infinite 
loops
+            return None;
         }
+
+        self.position += message_size;
+        Some(view)
     }
 }
diff --git a/server/src/streaming/segments/types/messages.rs 
b/server/src/streaming/segments/types/messages.rs
new file mode 100644
index 00000000..322d6a5f
--- /dev/null
+++ b/server/src/streaming/segments/types/messages.rs
@@ -0,0 +1,57 @@
+use bytes::{Bytes, BytesMut};
+use iggy::prelude::*;
+use std::ops::Deref;
+
+/// An immutable messages container that holds a buffer of messages
+#[derive(Debug, PartialEq)]
+pub struct IggyMessages {
+    /// The number of messages in the buffer
+    count: u32,
+    /// The buffer containing the messages
+    buffer: Bytes,
+}
+
+impl IggyMessages {
+    /// Create a new messages container from a buffer
+    pub fn new(buffer: Bytes, count: u32) -> Self {
+        Self { buffer, count }
+    }
+
+    /// Creates a empty messages container
+    pub fn empty() -> Self {
+        Self::new(BytesMut::new().freeze(), 0)
+    }
+
+    /// Create iterator over messages
+    pub fn iter(&self) -> IggyMessageViewIterator {
+        IggyMessageViewIterator::new(&self.buffer)
+    }
+
+    /// Get the number of messages
+    pub fn count(&self) -> u32 {
+        self.count
+    }
+
+    /// Get the total size of all messages in bytes
+    pub fn size(&self) -> u32 {
+        self.buffer.len() as u32
+    }
+
+    /// Get access to the underlying buffer
+    pub fn buffer(&self) -> &[u8] {
+        &self.buffer
+    }
+}
+
+impl Sizeable for IggyMessages {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        IggyByteSize::from(self.buffer.len() as u64)
+    }
+}
+
+impl Deref for IggyMessages {
+    type Target = [u8];
+    fn deref(&self) -> &Self::Target {
+        self.buffer()
+    }
+}
diff --git a/server/src/streaming/segments/types/messages_batch.rs 
b/server/src/streaming/segments/types/messages_batch.rs
new file mode 100644
index 00000000..c3bf1531
--- /dev/null
+++ b/server/src/streaming/segments/types/messages_batch.rs
@@ -0,0 +1,109 @@
+use super::messages::IggyMessages;
+use iggy::prelude::*;
+
+/// A batch container for multiple IggyMessages objects
+#[derive(Debug, Default)]
+pub struct IggyMessagesBatch {
+    /// The collection of message containers
+    messages: Vec<IggyMessages>,
+    /// Total number of messages across all containers
+    count: u32,
+    /// Total size in bytes across all containers
+    size: u32,
+}
+
+impl IggyMessagesBatch {
+    /// Create a new empty batch
+    pub fn new() -> Self {
+        Self {
+            messages: Vec::new(),
+            count: 0,
+            size: 0,
+        }
+    }
+
+    /// Create a new batch with a specified initial capacity
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            messages: Vec::with_capacity(capacity),
+            count: 0,
+            size: 0,
+        }
+    }
+
+    /// Create a batch from an existing vector of IggyMessages
+    pub fn from_vec(messages: Vec<IggyMessages>) -> Self {
+        let mut batch = Self::with_capacity(messages.len());
+        for msg in messages {
+            batch.add(msg);
+        }
+        batch
+    }
+
+    /// Add a message container to the batch
+    pub fn add(&mut self, messages: IggyMessages) {
+        self.count += messages.count();
+        self.size += messages.size();
+        self.messages.push(messages);
+    }
+
+    /// Add another batch of messages to the batch
+    pub fn add_batch(&mut self, other: IggyMessagesBatch) {
+        self.count += other.messages_count();
+        self.size += other.size();
+        self.messages.extend(other.messages);
+    }
+
+    /// Get the total number of messages in the batch
+    pub fn messages_count(&self) -> u32 {
+        self.count
+    }
+
+    /// Get the total size of all messages in bytes
+    pub fn size(&self) -> u32 {
+        self.size
+    }
+
+    /// Get the number of message containers in the batch
+    pub fn containers_count(&self) -> usize {
+        self.messages.len()
+    }
+
+    /// Check if the batch is empty
+    pub fn is_empty(&self) -> bool {
+        self.messages.is_empty() || self.count == 0
+    }
+
+    /// Get a reference to the underlying vector of message containers
+    pub fn inner(&self) -> &Vec<IggyMessages> {
+        &self.messages
+    }
+
+    /// Consume the batch, returning the underlying vector of message 
containers
+    pub fn into_inner(self) -> Vec<IggyMessages> {
+        self.messages
+    }
+
+    /// Iterate over all message containers in the batch
+    pub fn iter(&self) -> impl Iterator<Item = &IggyMessages> {
+        self.messages.iter()
+    }
+}
+
+impl Sizeable for IggyMessagesBatch {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        IggyByteSize::from(self.size as u64)
+    }
+}
+
+impl From<Vec<IggyMessages>> for IggyMessagesBatch {
+    fn from(messages: Vec<IggyMessages>) -> Self {
+        Self::from_vec(messages)
+    }
+}
+
+impl From<IggyMessages> for IggyMessagesBatch {
+    fn from(messages: IggyMessages) -> Self {
+        Self::from_vec(vec![messages])
+    }
+}
diff --git a/server/src/streaming/segments/types/messages_mut.rs 
b/server/src/streaming/segments/types/messages_mut.rs
index 7fbbca22..d3c4a6ac 100644
--- a/server/src/streaming/segments/types/messages_mut.rs
+++ b/server/src/streaming/segments/types/messages_mut.rs
@@ -1,36 +1,19 @@
-use super::message_view_mut::IggyMessageViewMutIterator;
-use bytes::{BufMut, Bytes, BytesMut};
+use super::{message_view_mut::IggyMessageViewMutIterator, IggyMessages};
+use crate::streaming::segments::Index;
+use bytes::{BufMut, BytesMut};
 use iggy::prelude::*;
 use lending_iterator::prelude::*;
-use serde::{Deserialize, Serialize};
+use std::ops::Deref;
 
 /// A container for mutable messages
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Debug, PartialEq, Default)]
 pub struct IggyMessagesMut {
     /// The number of messages in the buffer
-    #[serde(skip)]
     count: u32,
-
     /// The buffer containing the messages
     buffer: BytesMut,
 }
 
-impl BytesSerializable for IggyMessagesMut {
-    fn to_bytes(&self) -> Bytes {
-        self.buffer.clone().freeze()
-    }
-
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        Ok(Self {
-            buffer: BytesMut::from(&bytes[..]),
-            count: 0,
-        })
-    }
-}
-
 impl Sizeable for IggyMessagesMut {
     fn get_size_bytes(&self) -> IggyByteSize {
         IggyByteSize::from(self.buffer.len() as u64)
@@ -38,93 +21,93 @@ impl Sizeable for IggyMessagesMut {
 }
 
 impl IggyMessagesMut {
-    /// Create a new messages container from a buffer
-    pub fn new(buffer: BytesMut) -> Self {
-        Self { buffer, count: 0 }
+    /// Create a new messages container with a specified capacity
+    pub fn with_capacity(capacity: u32) -> Self {
+        Self {
+            buffer: BytesMut::with_capacity(capacity as usize),
+            count: 0,
+        }
+    }
+
+    /// Create a new messages container from a existing buffer of bytes
+    pub fn from_bytes(bytes: BytesMut, messages_count: u32) -> Self {
+        Self {
+            buffer: bytes,
+            count: messages_count,
+        }
     }
 
     /// Create a new messages container from a slice of messages
     pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self 
{
-        let mut buffer = BytesMut::with_capacity(messages_size as usize);
+        let mut messages_mut = Self::with_capacity(messages_size);
+
         for message in messages {
-            buffer.extend_from_slice(&message.to_bytes());
+            messages_mut.buffer.extend_from_slice(&message.to_bytes());
         }
-        let mut messages_mut = Self::new(buffer);
         messages_mut.count = messages.len() as u32;
         messages_mut
     }
 
-    /// Extends the messages container with another set of mutable messages by 
consuming them
-    pub fn extend(&mut self, messages: IggyMessagesMut) {
-        let count = messages.count();
-        let buffer = messages.into_inner();
-        self.buffer.put(buffer);
-        self.count += count;
-    }
-
-    /// Sets the capacity of the messages container
-    pub fn set_capacity(&mut self, capacity: u32) {
-        self.buffer.reserve(capacity as usize);
-    }
-
     /// Create a lending iterator over mutable messages
     pub fn iter_mut(&mut self) -> IggyMessageViewMutIterator {
         IggyMessageViewMutIterator::new(&mut self.buffer)
     }
 
-    /// Create iterator over messages
-    pub fn iter(&self) -> IggyMessageViewIterator {
-        IggyMessageViewIterator::new(&self.buffer)
-    }
-
     /// Get the number of messages
     pub fn count(&self) -> u32 {
         self.count
     }
 
-    /// Set the message count
-    pub fn set_count(&mut self, count: u32) {
-        self.count = count;
-    }
-
     /// Get the total size of all messages in bytes
     pub fn size(&self) -> u32 {
         self.buffer.len() as u32
     }
 
-    /// Get the buffer
-    pub fn into_inner(self) -> BytesMut {
-        self.buffer
-    }
-
     /// Prepares all messages in the batch for persistence by setting their 
offsets, timestamps,
     /// and other necessary fields. This method should be called before the 
messages are written to disk.
-    /// Returns the number of messages processed.
-    pub fn prepare_for_persistence(&mut self, base_offset: u64, timestamp: 
u64) -> u32 {
-        let mut processed_count = 0;
+    /// Returns the prepared, immutable messages.
+    pub fn prepare_for_persistence(
+        mut self,
+        base_offset: u64,
+        timestamp: u64,
+        current_position: u32,
+        indexes: &mut Vec<Index>,
+    ) -> IggyMessages {
+        let max_messages = self.count;
         let mut current_offset = base_offset;
+        let mut current_position = current_position;
+        let mut iter = self.iter_mut();
+        let mut processed_count = 0;
 
-        let mut iterator = self.iter_mut();
-        while let Some(message_result) = LendingIterator::next(&mut iterator) {
-            if let Ok(mut message) = message_result {
-                message.msg_header_mut().set_offset(current_offset);
+        while let Some(mut message) = iter.next() {
+            debug_assert!(processed_count <= max_messages);
+            processed_count += 1;
 
-                if message.msg_header().timestamp() == 0 {
-                    message.msg_header_mut().set_timestamp(timestamp);
-                }
+            message.msg_header_mut().set_offset(current_offset);
+            message.msg_header_mut().set_timestamp(timestamp);
 
-                message.update_checksum();
+            message.update_checksum();
 
-                current_offset += 1;
-                processed_count += 1;
-            }
+            current_offset += 1;
+            current_position += message.size() as u32;
+
+            indexes.push(Index {
+                offset: (current_offset - base_offset) as u32,
+                position: current_position,
+                timestamp,
+            });
         }
 
-        processed_count
+        let buffer = self.buffer.freeze();
+        IggyMessages::new(buffer, self.count)
     }
+}
+
+impl Deref for IggyMessagesMut {
+    type Target = BytesMut;
 
-    pub fn make_immutable(self) -> IggyMessages {
-        IggyMessages::new(self.buffer.freeze(), self.count)
+    fn deref(&self) -> &Self::Target {
+        &self.buffer
     }
 }
 
diff --git a/server/src/streaming/segments/types/messages_slice.rs 
b/server/src/streaming/segments/types/messages_slice.rs
deleted file mode 100644
index 740f065b..00000000
--- a/server/src/streaming/segments/types/messages_slice.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use bytes::{Bytes, BytesMut};
-use iggy::prelude::*;
-
-#[derive(Debug, Clone)]
-pub struct IggyMessagesSlice {
-    buffers: Vec<Bytes>,
-    count: u32,
-}
-
-impl IggyMessagesSlice {
-    pub fn new(buffer: Bytes, start: usize, end: usize, count: u32) -> Self {
-        Self {
-            buffers: vec![buffer.slice(start..end)],
-            count,
-        }
-    }
-
-    pub fn from_buffers(buffers: Vec<Bytes>, count: u32) -> Self {
-        Self { buffers, count }
-    }
-
-    pub fn combine(slices: Vec<IggyMessagesSlice>) -> Self {
-        let mut buffers = Vec::new();
-        let mut total_count = 0;
-        for slice in slices {
-            buffers.extend(slice.buffers);
-            total_count += slice.count;
-        }
-        Self {
-            buffers,
-            count: total_count,
-        }
-    }
-
-    pub fn chunks(&self) -> impl Iterator<Item = &[u8]> + '_ {
-        self.buffers.iter().map(|b| b.as_ref())
-    }
-
-    pub fn chunks_count(&self) -> usize {
-        self.buffers.len()
-    }
-
-    pub fn to_messages(&self) -> IggyMessages {
-        let total_size: usize = self.buffers.iter().map(|b| b.len()).sum();
-        let mut combined = BytesMut::with_capacity(total_size);
-        for chunk in &self.buffers {
-            combined.extend_from_slice(chunk);
-        }
-        IggyMessages::new(combined.freeze(), self.count)
-    }
-
-    pub fn count(&self) -> u32 {
-        self.count
-    }
-
-    pub fn size(&self) -> u32 {
-        self.buffers.iter().map(|b| b.len() as u32).sum()
-    }
-
-    pub fn is_empty(&self) -> bool {
-        self.count == 0 || self.buffers.is_empty()
-    }
-
-    pub fn empty() -> Self {
-        Self {
-            buffers: Vec::new(),
-            count: 0,
-        }
-    }
-}
diff --git a/server/src/streaming/segments/types/messages_slice_better.rs 
b/server/src/streaming/segments/types/messages_slice_better.rs
deleted file mode 100644
index 8b137891..00000000
--- a/server/src/streaming/segments/types/messages_slice_better.rs
+++ /dev/null
@@ -1 +0,0 @@
-
diff --git a/server/src/streaming/segments/types/mod.rs 
b/server/src/streaming/segments/types/mod.rs
index 016bc7b2..a39579dd 100644
--- a/server/src/streaming/segments/types/mod.rs
+++ b/server/src/streaming/segments/types/mod.rs
@@ -1,10 +1,11 @@
 mod message_header_view_mut;
 mod message_view_mut;
+mod messages;
+mod messages_batch;
 mod messages_mut;
-mod messages_slice;
-mod messages_slice_better;
 
 pub use message_header_view_mut::IggyMessageHeaderViewMut;
 pub use message_view_mut::IggyMessageViewMut;
+pub use messages::IggyMessages;
+pub use messages_batch::IggyMessagesBatch;
 pub use messages_mut::IggyMessagesMut;
-pub use messages_slice::IggyMessagesSlice;
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index f9c3d303..4e5a8a0c 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -1,7 +1,5 @@
-use super::indexes::*;
 use super::IggyMessagesMut;
 use crate::streaming::segments::segment::Segment;
-use bytes::BufMut;
 use error_set::ErrContext;
 use iggy::confirmation::Confirmation;
 use iggy::prelude::*;
@@ -21,15 +19,20 @@ impl Segment {
             ));
         }
         let messages_size = messages.size();
-        let messages_accumulator = 
self.unsaved_messages.get_or_insert_with(Default::default);
-        let messages_count =
-            messages_accumulator.coalesce_batch(current_offset, 
self.last_index_position, messages);
+        let messages_accumulator = &mut self.accumulator;
+        let messages_count = messages_accumulator.coalesce_batch(
+            current_offset,
+            self.last_index_position,
+            &mut self.indexes,
+            messages,
+        );
 
         if self.current_offset == 0 {
             self.start_timestamp = messages_accumulator.batch_base_timestamp();
         }
         self.end_timestamp = messages_accumulator.batch_max_timestamp();
         self.current_offset = messages_accumulator.max_offset();
+
         self.size_bytes += IggyByteSize::from(messages_size as u64);
 
         self.size_of_parent_stream
@@ -48,45 +51,15 @@ impl Segment {
         Ok(messages_count)
     }
 
-    fn store_offset_and_timestamp_index_for_batch(
-        &mut self,
-        batch_last_offset: u64,
-        batch_max_timestamp: u64,
-    ) -> Index {
-        let relative_offset = (batch_last_offset - self.start_offset) as u32;
-        trace!(
-            "Storing index for relative_offset: {relative_offset}, 
start_offset: {}",
-            self.start_offset
-        );
-        let index = Index {
-            offset: relative_offset,
-            position: self.last_index_position,
-            timestamp: batch_max_timestamp,
-        };
-        if let Some(indexes) = &mut self.indexes {
-            indexes.push(index);
-        }
-        index
-    }
-
     pub async fn persist_messages(
         &mut self,
         confirmation: Option<Confirmation>,
     ) -> Result<usize, IggyError> {
-        if self.unsaved_messages.is_none() {
+        if self.accumulator.is_empty() {
             return Ok(0);
         }
 
-        let messages_accumulator = self.unsaved_messages.take().unwrap();
-        if messages_accumulator.is_empty() {
-            return Ok(0);
-        }
-        // let batch_max_offset = messages_accumulator.max_offset();
-        // let batch_max_timestamp = 
messages_accumulator.batch_max_timestamp();
-        // let index =
-        //     
self.store_offset_and_timestamp_index_for_batch(batch_max_offset, 
batch_max_timestamp);
-
-        let unsaved_messages_number = 
messages_accumulator.unsaved_messages_count();
+        let unsaved_messages_number = 
self.accumulator.unsaved_messages_count();
         trace!(
             "Saving {} messages on disk in segment with start offset: {} for 
partition with ID: {}...",
             unsaved_messages_number,
@@ -94,7 +67,8 @@ impl Segment {
             self.partition_id
         );
 
-        let (messages, indexes) = messages_accumulator.materialize();
+        let accumulator = std::mem::take(&mut self.accumulator);
+        let batches = accumulator.materialize();
         let confirmation = match confirmation {
             Some(val) => val,
             None => self.config.segment.server_confirmation,
@@ -104,28 +78,22 @@ impl Segment {
             .messages_writer
             .as_mut()
             .unwrap()
-            .save_batches(messages, confirmation)
+            .save_batches(batches, confirmation)
             .await
             .with_error_context(|error| format!("Failed to save batch for 
{self}. {error}",))?;
 
         self.last_index_position += saved_bytes.as_bytes_u64() as u32;
 
+        debug_assert!(self.last_index_position == 
self.indexes.last().unwrap().position);
+
         self.index_writer
             .as_mut()
             .unwrap()
-            .save_indexes(&indexes)
+            .save_indexes(&self.indexes)
             .await
             .with_error_context(|error| format!("Failed to save index for 
{self}. {error}"))?;
 
-        // TODO(hubcio): fix this
-        // self.size_bytes += IggyByteSize::from(IGGY_BATCH_OVERHEAD);
-        // self.size_of_parent_stream
-        //     .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel);
-        // self.size_of_parent_topic
-        //     .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel);
-        // self.size_of_parent_partition
-        //     .fetch_add(IGGY_BATCH_OVERHEAD, Ordering::AcqRel);
-
+        self.indexes.clear();
         trace!(
             "Saved {} messages on disk in segment with start offset: {} for 
partition with ID: {}, total bytes written: {}.",
             unsaved_messages_number,
@@ -137,7 +105,6 @@ impl Segment {
         if self.is_full().await {
             self.end_offset = self.current_offset;
             self.is_closed = true;
-            self.unsaved_messages = None;
             self.shutdown_writing().await;
             info!(
                 "Closed segment with start offset: {}, end offset: {} for 
partition with ID: {}.",
diff --git a/server/src/streaming/systems/messages.rs 
b/server/src/streaming/systems/messages.rs
index 0eff2f1b..6dac1524 100644
--- a/server/src/streaming/systems/messages.rs
+++ b/server/src/streaming/systems/messages.rs
@@ -1,4 +1,4 @@
-use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice};
+use crate::streaming::segments::{IggyMessages, IggyMessagesBatch, 
IggyMessagesMut};
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::System;
 use crate::streaming::systems::COMPONENT;
@@ -19,7 +19,7 @@ impl System {
         topic_id: &Identifier,
         partition_id: Option<u32>,
         args: PollingArgs,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessagesBatch, IggyError> {
         self.ensure_authenticated(session)?;
         if args.count == 0 {
             return Err(IggyError::InvalidMessagesCount);
@@ -60,7 +60,7 @@ impl System {
             .get_messages(polling_consumer, partition_id, args.strategy, 
args.count)
             .await?;
 
-        return Ok(result);
+        Ok(result)
 
         // let offset = polled_messages.messages.last().unwrap().offset;
         // if args.auto_commit {
diff --git a/server/src/streaming/topics/messages.rs 
b/server/src/streaming/topics/messages.rs
index 8da9adaf..39eacd93 100644
--- a/server/src/streaming/topics/messages.rs
+++ b/server/src/streaming/topics/messages.rs
@@ -1,5 +1,5 @@
 use crate::streaming::polling_consumer::PollingConsumer;
-use crate::streaming::segments::{IggyMessagesMut, IggyMessagesSlice};
+use crate::streaming::segments::{IggyMessages, IggyMessagesBatch, 
IggyMessagesMut};
 use crate::streaming::topics::topic::Topic;
 use crate::streaming::topics::COMPONENT;
 use crate::streaming::utils::file::folder_size;
@@ -9,7 +9,6 @@ use error_set::ErrContext;
 use iggy::error::IggyError;
 use iggy::locking::IggySharedMutFn;
 use iggy::messages::{PartitioningKind, PollingKind};
-use iggy::prelude::IggyMessages;
 use iggy::prelude::Partitioning;
 use iggy::utils::byte_size::IggyByteSize;
 use iggy::utils::expiry::IggyExpiry;
@@ -31,7 +30,7 @@ impl Topic {
         partition_id: u32,
         strategy: PollingStrategy,
         count: u32,
-    ) -> Result<IggyMessagesSlice, IggyError> {
+    ) -> Result<IggyMessagesBatch, IggyError> {
         if !self.has_partitions() {
             return Err(IggyError::NoPartitions(self.topic_id, self.stream_id));
         }
@@ -176,123 +175,6 @@ impl Topic {
         partition_id
     }
 
-    pub(crate) async fn load_messages_from_disk_to_cache(&mut self) -> 
Result<(), IggyError> {
-        //TODO: Fix me
-        /*
-        if !self.config.cache.enabled {
-            return Ok(());
-        }
-        let path = self.config.get_system_path();
-
-        // TODO: load data from database instead of calculating the size on 
disk
-        let total_size_on_disk_bytes = folder_size(&path)
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to get folder 
size, path: {path}")
-            })
-            .map_err(|_| IggyError::InvalidSizeBytes)?;
-
-        for partition_lock in self.partitions.values_mut() {
-            let mut partition = partition_lock.write().await;
-
-            let end_offset = match partition.segments.last() {
-                Some(segment) => segment.current_offset,
-                None => {
-                    warn!(
-                        "No segments found for partition ID: {}, topic ID: {}, 
stream ID: {}",
-                        partition.partition_id, partition.topic_id, 
partition.stream_id
-                    );
-                    continue;
-                }
-            };
-
-            trace!(
-               "Loading messages to cache for partition ID: {}, topic ID: {}, 
stream ID: {}, offset: 0 to {}...",
-               partition.partition_id,
-               partition.topic_id,
-               partition.stream_id,
-               end_offset
-           );
-
-            let partition_size_bytes = partition.get_size_bytes();
-            let cache_limit_bytes = self.config.cache.size.clone().into();
-
-            // Fetch data from disk proportional to the partition size
-            // eg. 12 partitions, each has 300 MB, cache limit is 500 MB, so 
there is total 3600 MB of data on SSD.
-            // 500 MB * (300 / 3600 MB) ~= 41.6 MB to load from cache 
(assuming all partitions have the same size on disk)
-            let size_to_fetch_from_disk = (cache_limit_bytes.as_bytes_u64() as 
f64
-                * (partition_size_bytes.as_bytes_u64() as f64
-                    / total_size_on_disk_bytes.as_bytes_u64() as f64))
-                as u64;
-            let messages = partition
-                .get_newest_messages_by_size(size_to_fetch_from_disk as u64)
-                .await
-                .with_error_context(|error| format!("{COMPONENT} (error: 
{error}) - failed to get newest messages by size: {size_to_fetch_from_disk}"))?;
-
-            let sum = messages
-                .iter()
-                .map(|m| m.get_size_bytes())
-                .sum::<IggyByteSize>();
-            if !Self::cache_integrity_check(&messages) {
-                warn!(
-                   "Cache integrity check failed for partition ID: {}, topic 
ID: {}, stream ID: {}, offset: 0 to {}. Emptying cache...",
-                   partition.partition_id, partition.topic_id, 
partition.stream_id, end_offset
-               );
-            } else if let Some(cache) = &mut partition.cache {
-                for message in &messages {
-                    cache.push_safe(message.clone());
-                }
-
-                info!(
-                   "Loaded {} messages ({} bytes) to cache for partition ID: 
{}, topic ID: {}, stream ID: {}, offset: 0 to {}.",
-                   messages.len(), sum, partition.partition_id, 
partition.topic_id, partition.stream_id, end_offset
-               );
-            } else {
-                warn!(
-                    "Cache is invalid for ID: {}, topic ID: {}, stream ID: {}, 
offset: 0 to {}",
-                    partition.partition_id, partition.topic_id, 
partition.stream_id, end_offset
-                );
-            }
-        }
-
-        Ok(())
-        */
-        todo!()
-    }
-
-    fn cache_integrity_check(cache: ()) -> bool {
-        //TODO: Fix me
-        /*
-        if cache.is_empty() {
-            warn!("Cache is empty!");
-            return false;
-        }
-
-        let first_offset = cache[0].offset;
-        let last_offset = cache[cache.len() - 1].offset;
-
-        for i in 1..cache.len() {
-            if cache[i].offset != cache[i - 1].offset + 1 {
-                warn!("Offsets are not subsequent at index {} offset {}, for 
previous index {} offset is {}", i, cache[i].offset, i-1, cache[i-1].offset);
-                return false;
-            }
-        }
-
-        let expected_messages_count: u64 = last_offset - first_offset + 1;
-        if cache.len() != expected_messages_count as usize {
-            warn!(
-                "Messages count is in cache ({}) not equal to expected 
messages count ({})",
-                cache.len(),
-                expected_messages_count
-            );
-            return false;
-        }
-
-        true
-        */
-        todo!()
-    }
-
     pub async fn get_expired_segments_start_offsets_per_partition(
         &self,
         now: IggyTimestamp,
diff --git a/server/src/streaming/topics/storage.rs 
b/server/src/streaming/topics/storage.rs
index b1bd98f6..9b40dcc3 100644
--- a/server/src/streaming/topics/storage.rs
+++ b/server/src/streaming/topics/storage.rs
@@ -205,12 +205,6 @@ impl TopicStorage for FileTopicStorage {
                 .insert(consumer_group.group_id, RwLock::new(consumer_group));
         }
 
-        topic
-            .load_messages_from_disk_to_cache()
-            .await
-            .with_error_context(|error| {
-                format!("{COMPONENT} (error: {error}) - failed to load 
messages from disk to cache, topic: {topic}")
-            })?;
         info!("Loaded topic {topic}");
 
         Ok(())


Reply via email to