This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit ce3cf664e7bb422661f4d5679254e03dee027717
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Mar 23 19:16:31 2025 +0100

    another day, another refactor
---
 integration/tests/streaming/get_by_offset.rs       | 289 +++++++++++------
 integration/tests/streaming/get_by_timestamp.rs    | 350 ++++++++++++++-------
 integration/tests/streaming/mod.rs                 |   2 +-
 sdk/src/models/messaging/indexes.rs                | 179 ++---------
 sdk/src/models/messaging/message.rs                |  46 +--
 sdk/src/models/messaging/messages_batch.rs         | 167 ++++++----
 server/src/streaming/partitions/messages.rs        | 134 +++-----
 .../src/streaming/segments/indexes/index_reader.rs |   6 +-
 .../src/streaming/segments/indexes/index_writer.rs |   2 +-
 .../src/streaming/segments/indexes/indexes_mut.rs  |  62 ++--
 .../streaming/segments/messages/messages_writer.rs |  11 +-
 server/src/streaming/segments/messages/mod.rs      |  13 +-
 .../src/streaming/segments/messages_accumulator.rs | 184 ++++++-----
 server/src/streaming/segments/reading_messages.rs  |  87 +++--
 .../streaming/segments/types/message_view_mut.rs   |   7 -
 .../streaming/segments/types/messages_batch_mut.rs | 140 ++++++---
 .../streaming/segments/types/messages_batch_set.rs |  42 ++-
 server/src/streaming/segments/writing_messages.rs  |  83 ++---
 server/src/streaming/systems/messages.rs           |   2 +-
 19 files changed, 980 insertions(+), 826 deletions(-)

diff --git a/integration/tests/streaming/get_by_offset.rs 
b/integration/tests/streaming/get_by_offset.rs
index 721a3e8e..7a67e6d9 100644
--- a/integration/tests/streaming/get_by_offset.rs
+++ b/integration/tests/streaming/get_by_offset.rs
@@ -15,11 +15,11 @@ use test_case::test_matrix;
  */
 
 fn msg_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{}b", size)).unwrap()
+    IggyByteSize::from_str(&format!("{}B", size)).unwrap()
 }
 
 fn segment_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{}b", size)).unwrap()
+    IggyByteSize::from_str(&format!("{}B", size)).unwrap()
 }
 
 fn msgs_req_to_save(count: u32) -> u32 {
@@ -34,23 +34,42 @@ fn index_cache_disabled() -> bool {
     false
 }
 
+fn small_batches() -> Vec<u32> {
+    vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+    vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+    vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+    vec![500, 1000, 1500, 1000]
+}
+
 #[test_matrix(
     [msg_size(50), msg_size(1000), msg_size(20000)],
-    [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000)],
-    [segment_size(500), segment_size(2000), segment_size(100000)],
+    [small_batches(), medium_batches(), large_batches(), very_large_batches()],
+    [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000), 
msgs_req_to_save(4000)],
+    [segment_size(500), segment_size(2000), segment_size(100000), 
segment_size(10000000)],
     [index_cache_disabled(), index_cache_enabled()])]
 #[tokio::test]
 async fn test_get_messages_by_offset(
     message_size: IggyByteSize,
+    batch_sizes: Vec<u32>,
     messages_required_to_save: u32,
     segment_size: IggyByteSize,
     index_cache_enabled: bool,
 ) {
     println!(
-        "Running test with messages_required_to_save: {}, segment_size: {}, 
message_size: {}, cache_indexes: {}",
+        "Running test with message_size: {}, batches: {:?}, 
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
+        message_size,
+        batch_sizes,
         messages_required_to_save,
         segment_size,
-        message_size,
         index_cache_enabled
     );
 
@@ -59,9 +78,7 @@ async fn test_get_messages_by_offset(
     let topic_id = 1;
     let partition_id = 1;
 
-    // Define batch sizes for 5 appends
-    let batch_sizes = [3, 4, 5, 6, 7];
-    let total_messages: u32 = batch_sizes.iter().sum();
+    let total_messages_count = batch_sizes.iter().sum();
 
     let config = Arc::new(SystemConfig {
         path: setup.config.path.to_string(),
@@ -98,8 +115,10 @@ async fn test_get_messages_by_offset(
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
 
-    let mut all_messages = Vec::with_capacity(total_messages as usize);
-    for i in 1..=total_messages {
+    let mut all_messages = Vec::with_capacity(total_messages_count as usize);
+
+    // Generate all messages as defined in the test matrix
+    for i in 1..=total_messages_count {
         let id = i as u128;
         let beginning_of_payload = format!("message {}", i);
         let mut payload = BytesMut::new();
@@ -133,9 +152,22 @@ async fn test_get_messages_by_offset(
     let mut batch_offsets = Vec::with_capacity(batch_sizes.len());
     let mut current_pos = 0;
 
-    // Append all batches
-    for batch_len in batch_sizes {
-        let messages_slice_to_append = &all_messages[current_pos..current_pos 
+ batch_len as usize];
+    // Append all batches as defined in the test matrix
+    for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() {
+        // If we've generated too many messages, skip the rest
+        if current_pos + batch_len as usize > all_messages.len() {
+            break;
+        }
+
+        println!(
+            "Appending batch {}/{} with {} messages",
+            batch_idx + 1,
+            batch_sizes.len(),
+            batch_len
+        );
+
+        let batch_end_pos = current_pos + batch_len as usize;
+        let messages_slice_to_append = 
&all_messages[current_pos..batch_end_pos];
 
         let messages_size = messages_slice_to_append
             .iter()
@@ -150,49 +182,59 @@ async fn test_get_messages_by_offset(
         current_pos += batch_len as usize;
     }
 
+    // Use the exact total messages count from the test matrix
+    let total_sent_messages = total_messages_count;
+
     // Test 1: All messages from start
     let all_loaded_messages = partition
-        .get_messages_by_offset(0, total_messages)
+        .get_messages_by_offset(0, total_sent_messages)
         .await
         .unwrap();
     assert_eq!(
         all_loaded_messages.count(),
-        total_messages,
+        total_sent_messages,
         "Expected {} messages from start, but got {}",
-        total_messages,
+        total_sent_messages,
         all_loaded_messages.count()
     );
 
     // Test 2: Get messages from middle (after 3rd batch)
-    let middle_offset = batch_offsets[2];
-    let remaining_messages = total_messages - (batch_sizes[0] + batch_sizes[1] 
+ batch_sizes[2]);
-    let middle_messages = partition
-        .get_messages_by_offset(middle_offset + 1, remaining_messages)
-        .await
-        .unwrap();
-    assert_eq!(
-        middle_messages.count(),
-        remaining_messages,
-        "Expected {} messages from middle offset, but got {}",
-        remaining_messages,
-        middle_messages.count()
-    );
+    if batch_offsets.len() >= 3 {
+        let middle_offset = batch_offsets[2];
+        let prior_batches_sum: u32 = batch_sizes[..3].iter().sum();
+        let remaining_messages = total_sent_messages - prior_batches_sum;
+
+        let middle_messages = partition
+            .get_messages_by_offset(middle_offset + 1, remaining_messages)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            middle_messages.count(),
+            remaining_messages,
+            "Expected {} messages from middle offset, but got {}",
+            remaining_messages,
+            middle_messages.count()
+        );
+    }
 
     // Test 3: No messages beyond final offset
-    let final_offset = batch_offsets.last().unwrap();
-    let no_messages = partition
-        .get_messages_by_offset(final_offset + 1, 1)
-        .await
-        .unwrap();
-    assert_eq!(
-        no_messages.count(),
-        0,
-        "Expected no messages beyond final offset, but got {}",
-        no_messages.count()
-    );
+    if !batch_offsets.is_empty() {
+        let final_offset = *batch_offsets.last().unwrap();
+        let no_messages = partition
+            .get_messages_by_offset(final_offset + 1, 1)
+            .await
+            .unwrap();
+        assert_eq!(
+            no_messages.count(),
+            0,
+            "Expected no messages beyond final offset, but got {}",
+            no_messages.count()
+        );
+    }
 
     // Test 4: Small subset from start
-    let subset_size = 3;
+    let subset_size = std::cmp::min(3, total_sent_messages);
     let subset_messages = partition
         .get_messages_by_offset(0, subset_size)
         .await
@@ -206,70 +248,109 @@ async fn test_get_messages_by_offset(
     );
 
     // Test 5: Messages spanning multiple batches
-    let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd batch
-    let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
-    let batches = partition
-        .get_messages_by_offset(span_offset, span_size)
-        .await
-        .unwrap();
-    assert_eq!(
-        batches.count(),
-        span_size,
-        "Expected {} messages spanning multiple batches, but got {}",
-        span_size,
-        batches.count()
-    );
+    if batch_offsets.len() >= 4 {
+        let span_offset = batch_offsets[1] + 1; // Start from middle of 2nd 
batch
+        let span_size = 8; // Should span across 2nd, 3rd, and into 4th batch
+        let batches = partition
+            .get_messages_by_offset(span_offset, span_size)
+            .await
+            .unwrap();
+        assert_eq!(
+            batches.count(),
+            span_size,
+            "Expected {} messages spanning multiple batches, but got {}",
+            span_size,
+            batches.count()
+        );
 
-    // Test 6: Validate message content and ordering
-    let mut i = 0;
-    for batch in batches.iter() {
-        for msg in batch.iter() {
-            println!(
-                "Message at position {}, offset: {}",
-                i,
-                msg.header().offset()
-            );
-            let expected_offset = span_offset + i as u64;
-            assert!(
-                msg.header().offset() >= expected_offset,
-                "Message offset {} at position {} should be >= expected offset 
{}",
-                msg.header().offset(),
-                i,
-                expected_offset
-            );
-
-            let original_offset = msg.header().offset() as usize;
-            let original_message = &all_messages[original_offset];
-
-            let loaded_id = msg.header().id();
-            let original_id = original_message.header.id;
-            assert_eq!(
-                loaded_id,
-                original_id,
-                "Message ID mismatch at offset {}",
-                msg.header().offset(),
-            );
-
-            let loaded_payload = msg.payload();
-            let original_payload = &original_message.payload;
-            assert_eq!(
-                loaded_payload,
-                original_payload,
-                "Payload mismatch at offset {}",
-                msg.header().offset(),
-            );
-
-            let loaded_headers = msg.user_headers_map().unwrap().unwrap();
-            let original_headers =
-                
HashMap::from_bytes(original_message.user_headers.as_ref().unwrap().clone())
+        // Test 6: Validate message content and ordering for all messages
+        let mut i = 0;
+
+        for batch in batches.iter() {
+            for msg in batch.iter() {
+                let expected_offset = span_offset + i as u64;
+                assert!(
+                    msg.header().offset() >= expected_offset,
+                    "Message offset {} at position {} should be >= expected 
offset {}",
+                    msg.header().offset(),
+                    i,
+                    expected_offset
+                );
+
+                let original_offset = msg.header().offset() as usize;
+                if original_offset < all_messages.len() {
+                    let original_message = &all_messages[original_offset];
+
+                    let loaded_id = msg.header().id();
+                    let original_id = original_message.header.id;
+                    assert_eq!(
+                        loaded_id,
+                        original_id,
+                        "Message ID mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+
+                    let loaded_payload = msg.payload();
+                    let original_payload = &original_message.payload;
+                    assert_eq!(
+                        loaded_payload,
+                        original_payload,
+                        "Payload mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+
+                    let loaded_headers = 
msg.user_headers_map().unwrap().unwrap();
+                    let original_headers = HashMap::from_bytes(
+                        
original_message.user_headers.as_ref().unwrap().clone(),
+                    )
                     .unwrap();
-            assert_eq!(
-                loaded_headers,
-                original_headers,
-                "Headers mismatch at offset {}",
-                msg.header().offset(),
-            );
-            i += 1;
+                    assert_eq!(
+                        loaded_headers,
+                        original_headers,
+                        "Headers mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+                }
+                i += 1;
+            }
         }
     }
+
+    // Add sequential read test for all batch sizes
+    println!(
+        "Verifying sequential reads, expecting {} messages",
+        total_sent_messages
+    );
+
+    let chunk_size = 500;
+    let mut verified_count = 0;
+
+    for chunk_start in (0..total_sent_messages).step_by(chunk_size as usize) {
+        let read_size = std::cmp::min(chunk_size, total_sent_messages - 
chunk_start);
+
+        let chunk = partition
+            .get_messages_by_offset(chunk_start as u64, read_size)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            chunk.count(),
+            read_size,
+            "Failed to read chunk at offset {} with size {}",
+            chunk_start,
+            read_size
+        );
+
+        verified_count += chunk.count();
+
+        println!(
+            "Read chunk at offset {} with size {}, verified count: {}",
+            chunk_start, read_size, verified_count
+        );
+    }
+
+    assert_eq!(
+        verified_count, total_sent_messages,
+        "Sequential chunk reads didn't cover all messages"
+    );
 }
diff --git a/integration/tests/streaming/get_by_timestamp.rs 
b/integration/tests/streaming/get_by_timestamp.rs
index 8241e819..d89e38ef 100644
--- a/integration/tests/streaming/get_by_timestamp.rs
+++ b/integration/tests/streaming/get_by_timestamp.rs
@@ -1,5 +1,6 @@
 use crate::streaming::common::test_setup::TestSetup;
 use bytes::BytesMut;
+use iggy::confirmation::Confirmation;
 use iggy::prelude::*;
 use server::configs::system::{PartitionConfig, SegmentConfig, SystemConfig};
 use server::streaming::partitions::partition::Partition;
@@ -16,11 +17,11 @@ use test_case::test_matrix;
  */
 
 fn msg_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{}b", size)).unwrap()
+    IggyByteSize::from_str(&format!("{}B", size)).unwrap()
 }
 
 fn segment_size(size: u64) -> IggyByteSize {
-    IggyByteSize::from_str(&format!("{}b", size)).unwrap()
+    IggyByteSize::from_str(&format!("{}B", size)).unwrap()
 }
 
 fn msgs_req_to_save(count: u32) -> u32 {
@@ -35,23 +36,42 @@ fn index_cache_disabled() -> bool {
     false
 }
 
+fn small_batches() -> Vec<u32> {
+    vec![3, 4, 5, 6, 7]
+}
+
+fn medium_batches() -> Vec<u32> {
+    vec![10, 20, 30, 40]
+}
+
+fn large_batches() -> Vec<u32> {
+    vec![100, 200, 300, 400]
+}
+
+fn very_large_batches() -> Vec<u32> {
+    vec![500, 1000, 1500, 1000]
+}
+
 #[test_matrix(
     [msg_size(50), msg_size(1000), msg_size(20000)],
-    [msgs_req_to_save(3), msgs_req_to_save(10),  msgs_req_to_save(1000)],
-    [segment_size(500), segment_size(2000), segment_size(100000)],
+    [small_batches(), medium_batches(), large_batches(), very_large_batches()],
+    [msgs_req_to_save(10), msgs_req_to_save(24), msgs_req_to_save(1000), 
msgs_req_to_save(4000)],
+    [segment_size(500), segment_size(2000), segment_size(100000), 
segment_size(10000000)],
     [index_cache_disabled(), index_cache_enabled()])]
 #[tokio::test]
 async fn test_get_messages_by_timestamp(
     message_size: IggyByteSize,
+    batch_sizes: Vec<u32>,
     messages_required_to_save: u32,
     segment_size: IggyByteSize,
     index_cache_enabled: bool,
 ) {
     println!(
-        "Running test with messages_required_to_save: {}, segment_size: {}, 
message_size: {}, cache_indexes: {}",
+        "Running test with message_size: {}, batches: {:?}, 
messages_required_to_save: {}, segment_size: {}, cache_indexes: {}",
+        message_size,
+        batch_sizes,
         messages_required_to_save,
         segment_size,
-        message_size,
         index_cache_enabled
     );
 
@@ -60,9 +80,7 @@ async fn test_get_messages_by_timestamp(
     let topic_id = 1;
     let partition_id = 1;
 
-    // Define batch sizes for 5 appends
-    let batch_sizes = [3, 4, 5, 6, 7];
-    let total_messages: u32 = batch_sizes.iter().sum();
+    let total_messages_count = batch_sizes.iter().sum();
 
     let config = Arc::new(SystemConfig {
         path: setup.config.path.to_string(),
@@ -78,6 +96,7 @@ async fn test_get_messages_by_timestamp(
         },
         ..Default::default()
     });
+
     let mut partition = Partition::create(
         stream_id,
         topic_id,
@@ -98,8 +117,10 @@ async fn test_get_messages_by_timestamp(
     setup.create_partitions_directory(stream_id, topic_id).await;
     partition.persist().await.unwrap();
 
-    let mut all_messages = Vec::with_capacity(total_messages as usize);
-    for i in 1..=total_messages {
+    let mut all_messages = Vec::with_capacity(total_messages_count as usize);
+
+    // Generate all messages as defined in the test matrix
+    for i in 1..=total_messages_count {
         let id = i as u128;
         let beginning_of_payload = format!("message {}", i);
         let mut payload = BytesMut::new();
@@ -120,6 +141,7 @@ async fn test_get_messages_by_timestamp(
             HeaderKey::new("key-3").unwrap(),
             HeaderValue::from_uint64(123456).unwrap(),
         );
+
         let message = IggyMessage::builder()
             .with_id(id)
             .with_payload(payload)
@@ -128,65 +150,94 @@ async fn test_get_messages_by_timestamp(
         all_messages.push(message);
     }
 
+    // Timestamp tracking for messages
     let initial_timestamp = IggyTimestamp::now();
     let mut batch_timestamps = Vec::with_capacity(batch_sizes.len());
     let mut current_pos = 0;
 
-    // Append all batches with timestamps
-    for batch_len in batch_sizes {
-        let messages_slice_to_append = &all_messages[current_pos..current_pos 
+ batch_len as usize];
+    // Append all batches as defined in the test matrix with separate 
timestamps
+    for (batch_idx, &batch_len) in batch_sizes.iter().enumerate() {
+        // Add a small delay between batches to ensure distinct timestamps
+        sleep(std::time::Duration::from_millis(2));
+
+        // If we've generated too many messages, skip the rest
+        if current_pos + batch_len as usize > all_messages.len() {
+            break;
+        }
+
+        println!(
+            "Appending batch {}/{} with {} messages",
+            batch_idx + 1,
+            batch_sizes.len(),
+            batch_len
+        );
+
+        let batch_end_pos = current_pos + batch_len as usize;
+        let messages_slice_to_append = 
&all_messages[current_pos..batch_end_pos];
 
         let messages_size = messages_slice_to_append
             .iter()
-            .map(|m| m.get_size_bytes().as_bytes_u32())
+            .map(|m| m.get_size_bytes().as_bytes_u64() as u32)
             .sum();
 
         let batch = 
IggyMessagesBatchMut::from_messages(messages_slice_to_append, messages_size);
         assert_eq!(batch.count(), batch_len);
-        partition.append_messages(batch, None).await.unwrap();
+        partition
+            .append_messages(batch, Some(Confirmation::Wait))
+            .await
+            .unwrap();
+
+        // Capture the timestamp of this batch
         batch_timestamps.push(IggyTimestamp::now());
         current_pos += batch_len as usize;
-        sleep(std::time::Duration::from_millis(10));
+
+        // Add a small delay between batches to ensure distinct timestamps
+        sleep(std::time::Duration::from_millis(2));
     }
 
     let final_timestamp = IggyTimestamp::now();
 
+    // Use the exact total messages count from the test matrix
+    let total_sent_messages = total_messages_count;
+
     // Test 1: All messages from initial timestamp
     let all_loaded_messages = partition
-        .get_messages_by_timestamp(initial_timestamp, total_messages)
+        .get_messages_by_timestamp(initial_timestamp, total_sent_messages)
         .await
         .unwrap();
     assert_eq!(
         all_loaded_messages.count(),
-        total_messages,
+        total_sent_messages,
         "Expected {} messages from initial timestamp, but got {}",
-        total_messages,
+        total_sent_messages,
         all_loaded_messages.count()
     );
 
     // Test 2: Get messages from middle timestamp (after 3rd batch)
-    let middle_timestamp = batch_timestamps[2];
-    // We expect to get all messages after the 3rd batch (this is where the 
test was failing)
-    let expected_messages = batch_sizes[3] + batch_sizes[4]; // Only these two 
batches remain after batch 2
-    let remaining_count = total_messages - (batch_sizes[0] + batch_sizes[1] + 
batch_sizes[2]);
-
-    // Use a timestamp that's 50us earlier than the recorded batch timestamp
-    // This ensures we catch the right batch boundary consistently
-    let adjusted_timestamp = (middle_timestamp.as_micros() - 50).into();
-
-    let middle_messages = partition
-        .get_messages_by_timestamp(adjusted_timestamp, remaining_count)
-        .await
-        .unwrap();
-    assert_eq!(
-        middle_messages.count(),
-        expected_messages,
-        "Expected {} messages from middle timestamp, but got {}",
-        expected_messages,
-        middle_messages.count()
-    );
+    if batch_timestamps.len() >= 3 {
+        // Use a timestamp that's just before the 3rd batch's timestamp to 
ensure we get messages
+        // from that batch onwards
+        let middle_timestamp = 
IggyTimestamp::from(batch_timestamps[2].as_micros() + 1000);
+
+        // Calculate how many messages should be in batches after the 3rd
+        let prior_batches_sum: u32 = batch_sizes[..3].iter().sum();
+        let remaining_messages = total_sent_messages - prior_batches_sum;
+
+        let middle_messages = partition
+            .get_messages_by_timestamp(middle_timestamp, remaining_messages)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            middle_messages.count(),
+            remaining_messages,
+            "Expected {} messages from middle timestamp, but got {}",
+            remaining_messages,
+            middle_messages.count()
+        );
+    }
 
-    // Test 3: No messages from final timestamp
+    // Test 3: No messages after final timestamp
     let no_messages = partition
         .get_messages_by_timestamp(final_timestamp, 1)
         .await
@@ -194,12 +245,12 @@ async fn test_get_messages_by_timestamp(
     assert_eq!(
         no_messages.count(),
         0,
-        "Expected no messages from final timestamp, but got {}",
+        "Expected no messages after final timestamp, but got {}",
         no_messages.count()
     );
 
     // Test 4: Small subset from initial timestamp
-    let subset_size = 3;
+    let subset_size = std::cmp::min(3, total_sent_messages);
     let subset_messages = partition
         .get_messages_by_timestamp(initial_timestamp, subset_size)
         .await
@@ -212,77 +263,150 @@ async fn test_get_messages_by_timestamp(
         subset_messages.count()
     );
 
-    println!("initial timestamp: {}", initial_timestamp.as_micros());
-
-    let mut i = 0;
-    for batch in subset_messages.iter() {
-        for message in batch.iter() {
-            let loaded_message = subset_messages.get(i).unwrap();
-            let original_message = &all_messages[i];
-
-            let loaded_header = loaded_message.header();
-            let original_header = &original_message.header;
-
-            assert_eq!(
-                loaded_header.id(),
-                original_header.id,
-                "Message ID mismatch at position {}",
-                i
-            );
-            assert_eq!(
-                loaded_message.payload(),
-                original_message.payload,
-                "Payload mismatch at position {}",
-                i
-            );
-            let loaded_headers = msg.user_headers().map(|headers| 
headers.to_vec());
-            let original_headers = 
original_message.user_headers.as_ref().unwrap().clone();
-            assert_eq!(
-                HashMap::from_bytes(loaded_headers),
-                HashMap::from_bytes(original_headers),
-                "Headers mismatch at position {}",
-                i
-            );
-            assert!(
-                loaded_message.header().timestamp() >= 
initial_timestamp.as_micros(),
-                "Message timestamp mismatch at position {}, timestamp {} is 
less than initial timestamp {}",
-                i,
-                loaded_message.header().timestamp(),
-                initial_timestamp
-            );
-
-            i += 1;
-        }
-    }
-
-    // Test 5: Messages spanning multiple batches (from middle of 2nd batch 
timestamp)
-    let span_timestamp = batch_timestamps[1];
-    let span_size = 8;
-    let spanning_messages = partition
-        .get_messages_by_timestamp(span_timestamp, span_size)
-        .await
-        .unwrap();
-    assert_eq!(
-        spanning_messages.count(),
-        span_size,
-        "Expected {} messages spanning multiple batches, but got {}",
-        span_size,
-        spanning_messages.count()
-    );
+    // Test 5: Messages spanning multiple batches by timestamp
+    if batch_timestamps.len() >= 4 {
+        // Use a timestamp that's just before the 2nd batch's timestamp
+        let span_timestamp = 
IggyTimestamp::from(batch_timestamps[1].as_micros() + 1000);
+        let span_size = 8; // Should span across multiple batches
+
+        let spanning_messages = partition
+            .get_messages_by_timestamp(span_timestamp, span_size)
+            .await
+            .unwrap();
+
+        assert_eq!(
+            spanning_messages.count(),
+            span_size,
+            "Expected {} messages spanning multiple batches, but got {}",
+            span_size,
+            spanning_messages.count()
+        );
 
-    // Make sure the timestamp we're using for comparison is accurate
-    let span_timestamp_micros = span_timestamp.as_micros();
-    println!("Span timestamp in microseconds: {}", span_timestamp_micros);
-
-    for batch in spanning_messages.iter() {
-        for msg in batch.iter() {
-            let msg_timestamp = msg.header().timestamp();
-            assert!(
-                msg_timestamp >= span_timestamp_micros,
-                "Message timestamp {} should be >= span timestamp {}",
-                msg_timestamp,
-                span_timestamp_micros
-            );
+        // Verify that all messages have timestamps >= our reference timestamp
+        let span_timestamp_micros = span_timestamp.as_micros();
+
+        // Test 6: Validate message content and ordering
+        for batch in spanning_messages.iter() {
+            for msg in batch.iter() {
+                let msg_timestamp = msg.header().timestamp();
+                assert!(
+                    msg_timestamp >= span_timestamp_micros,
+                    "Message timestamp {} should be >= span timestamp {}",
+                    msg_timestamp,
+                    span_timestamp_micros
+                );
+
+                // Verify message content
+                let loaded_id = msg.header().id();
+                let original_offset = msg.header().offset() as usize;
+
+                if original_offset < all_messages.len() {
+                    let original_message = &all_messages[original_offset];
+                    let original_id = original_message.header.id;
+
+                    assert_eq!(
+                        loaded_id,
+                        original_id,
+                        "Message ID mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+
+                    let loaded_payload = msg.payload();
+                    let original_payload = &original_message.payload;
+                    assert_eq!(
+                        loaded_payload,
+                        original_payload,
+                        "Payload mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+
+                    let loaded_headers = 
msg.user_headers_map().unwrap().unwrap();
+                    let original_headers = HashMap::from_bytes(
+                        
original_message.user_headers.as_ref().unwrap().clone(),
+                    )
+                    .unwrap();
+                    assert_eq!(
+                        loaded_headers,
+                        original_headers,
+                        "Headers mismatch at offset {}",
+                        msg.header().offset(),
+                    );
+                }
+            }
         }
     }
+
+    // // Add sequential read test by timestamp
+    // println!(
+    //     "Verifying sequential time-based reads, expecting {} messages",
+    //     total_sent_messages
+    // );
+
+    // // For timestamp-based sequential reads, we'll use timestamps from 
batch_timestamps
+    // // array to retrieve messages in batches (which are part of returned 
batch_set)
+    // if !batch_timestamps.is_empty() {
+    //     let chunk_size = 500;
+    //     let mut verified_count = 0;
+
+    //     let mut current_timestamp = initial_timestamp;
+
+    //     while verified_count < total_sent_messages {
+    //         let msgs_to_read = std::cmp::min(chunk_size, 
total_sent_messages - verified_count);
+
+    //         let batch_set = partition
+    //             .get_messages_by_timestamp(current_timestamp, msgs_to_read)
+    //             .await
+    //             .unwrap();
+
+    //         if batch_set.count() == 0 {
+    //             println!(
+    //                 "No messages found for timestamp {}, expected {}",
+    //                 current_timestamp.as_micros(),
+    //                 msgs_to_read
+    //             );
+    //             // We've read all messages, or there's a gap in the 
timestamps
+    //             break;
+    //         }
+
+    //         verified_count += batch_set.count();
+
+    //         println!(
+    //             "Read chunk from timestamp {} with size {}, verified count: 
{}, first offset {}, last offset {}",
+    //             current_timestamp.as_micros(),
+    //             batch_set.count(),
+    //             verified_count,
+    //             batch_set
+    //                 .iter()
+    //                 .next()
+    //                 .unwrap()
+    //                 .iter()
+    //                 .next()
+    //                 .unwrap()
+    //                 .header()
+    //                 .offset(),
+    //             batch_set
+    //                 .iter()
+    //                 .last()
+    //                 .unwrap()
+    //                 .iter()
+    //                 .last()
+    //                 .unwrap()
+    //                 .header()
+    //                 .offset()
+    //         );
+
+    //         // Get the last message timestamp from the batch set and use it 
for the next query
+    //         // Add a small offset to avoid getting the same message again
+    //         if let Some(batch) = batch_set.iter().last() {
+    //             if let Some(msg) = batch.iter().last() {
+    //                 current_timestamp = 
IggyTimestamp::from(msg.header().timestamp() + 1);
+    //             }
+    //         }
+    //     }
+
+    //     assert_eq!(
+    //         verified_count, total_sent_messages,
+    //         "Sequential timestamp batch set reads didn't cover all messages"
+    //     );
+    // }
 }
diff --git a/integration/tests/streaming/mod.rs 
b/integration/tests/streaming/mod.rs
index 12b37189..1799e9c3 100644
--- a/integration/tests/streaming/mod.rs
+++ b/integration/tests/streaming/mod.rs
@@ -4,7 +4,7 @@ use iggy::prelude::IggyMessage;
 mod common;
 // mod consumer_offset;
 mod get_by_offset;
-// mod get_by_timestamp;
+mod get_by_timestamp;
 // mod messages;
 // mod partition;
 // mod segment;
diff --git a/sdk/src/models/messaging/indexes.rs 
b/sdk/src/models/messaging/indexes.rs
index 5a6cc179..420c1b78 100644
--- a/sdk/src/models/messaging/indexes.rs
+++ b/sdk/src/models/messaging/indexes.rs
@@ -28,20 +28,11 @@ impl IggyIndexes {
         }
     }
 
-    pub fn into_inner(self) -> Bytes {
-        self.buffer
-    }
-
     /// Gets the number of indexes in the container
     pub fn count(&self) -> u32 {
         self.buffer.len() as u32 / INDEX_SIZE as u32
     }
 
-    /// Checks if the container is empty
-    pub fn is_empty(&self) -> bool {
-        self.count() == 0
-    }
-
     /// Gets the size of all indexes messages
     pub fn messages_size(&self) -> u32 {
         self.last_position() - self.base_position
@@ -63,30 +54,48 @@ impl IggyIndexes {
         }
     }
 
+    /// Gets a slice of the container
     pub fn slice_by_offset(&self, relative_start_offset: u32, count: u32) -> 
Option<IggyIndexes> {
-        let available_count = 
self.count().saturating_sub(relative_start_offset);
+        if self.count() == 0 || relative_start_offset >= self.count() {
+            return None;
+        }
 
-        let required_count = count;
-        let actual_count = std::cmp::min(required_count, available_count);
+        let available_count = 
self.count().saturating_sub(relative_start_offset);
+        let actual_count = std::cmp::min(count, available_count);
 
-        if actual_count == 0 || relative_start_offset >= self.count() {
+        if actual_count == 0 {
             return None;
         }
 
         let end_pos = relative_start_offset + actual_count;
         let start_byte = relative_start_offset as usize * INDEX_SIZE;
         let end_byte = end_pos as usize * INDEX_SIZE;
+
+        if end_byte > self.buffer.len() {
+            return None;
+        }
+
         let slice = self.buffer.slice(start_byte..end_byte);
 
-        let i = relative_start_offset.saturating_sub(1);
-        if i == 0 {
-            tracing::error!("IDX 0 base_position: 0");
-            Some(IggyIndexes::new(slice, 0))
+        let base_position = if relative_start_offset > 0 {
+            match self.get(relative_start_offset - 1) {
+                Some(index) => index.position(),
+                None => self.base_position,
+            }
         } else {
-            let base_position = self.get(relative_start_offset - 
1).unwrap().position();
-            tracing::error!("base_position: {base_position}");
-            Some(IggyIndexes::new(slice, base_position))
+            self.base_position
+        };
+
+        Some(IggyIndexes::new(slice, base_position))
+    }
+
+    /// Gets a first index
+    pub fn first(&self) -> Option<IggyIndexView> {
+        if self.count() == 0 {
+            return None;
         }
+
+        Some(IggyIndexView::new(&self.buffer[0..INDEX_SIZE]))
     }
 
     /// Gets a last index
@@ -149,136 +158,6 @@ impl IggyIndexes {
         result
     }
 
-    // /// Calculate boundary information for reading messages from disk based 
on cached indexes.
-    // ///
-    // /// This computes the exact file position, bytes to read, and expected 
message count
-    // /// based on the provided relative offsets using in-memory index data.
-    // ///
-    // /// Returns None if the requested offset is out of bounds.
-    // pub fn calculate_cached_read_boundary_by_offset(
-    //     &self,
-    //     relative_start_offset: u32,
-    //     relative_end_offset: u32,
-    // ) -> Option<ReadBoundary> {
-    //     if self.count() == 0 {
-    //         trace!("No indexes available in memory");
-    //         return None;
-    //     }
-
-    //     if relative_start_offset >= self.count() {
-    //         trace!(
-    //             "Start offset {} is out of bounds. Total cached indexes: 
{}",
-    //             relative_start_offset,
-    //             self.count(),
-    //         );
-    //         return None;
-    //     }
-
-    //     let effective_end_offset = relative_end_offset.min(self.count() - 
1);
-
-    //     tracing::error!(
-    //         "start offset: {}, end offset: {}",
-    //         relative_start_offset,
-    //         effective_end_offset
-    //     );
-
-    //     // With our new index interpretation:
-    //     // - For messages after the first one, the start position comes 
from the PREVIOUS index
-    //     // - For the first message (offset 0), the start position is 
implicitly 0
-    //     // - The end position always comes from the current index
-
-    //     let start_position = if relative_start_offset > 0 {
-    //         // For non-first messages, get start position from previous 
index
-    //         let prev_index = self.get(relative_start_offset - 1)?;
-    //         prev_index.position()
-    //     } else {
-    //         // For the first message, start position is 0
-    //         0
-    //     };
-
-    //     // The end position comes from the last index we want to read
-    //     let last_index = self.get(effective_end_offset)?;
-    //     let end_position = last_index.position();
-
-    //     let bytes = end_position - start_position;
-    //     let messages_count = effective_end_offset - relative_start_offset + 
1;
-
-    //     trace!(
-    //         "Calculated read boundary from cached indexes: start_pos={}, 
bytes={}, count={}",
-    //         start_position,
-    //         bytes,
-    //         messages_count
-    //     );
-
-    //     Some(ReadBoundary::new(start_position, bytes, messages_count))
-    // }
-
-    // /// Calculate boundary information for reading messages from disk based 
on a timestamp.
-    // ///
-    // /// This finds the index with timestamp closest to the requested 
timestamp and returns
-    // /// the boundary information to read the messages from that point 
forward.
-    // ///
-    // /// Returns None if the timestamp is not found or there are no indexes.
-    // pub fn calculate_cached_read_boundary_by_timestamp(
-    //     &self,
-    //     timestamp: u64,
-    //     messages_count: u32,
-    // ) -> Option<ReadBoundary> {
-    //     let start_index = self.find_by_timestamp(timestamp)?;
-
-    //     tracing::trace!("Found start_index: {}", start_index);
-
-    //     let start_position_in_array = start_index.offset() - 
self.first_offset();
-    //     let end_position_in_array =
-    //         (start_position_in_array + messages_count - 1).min(self.count() 
- 1);
-
-    //     tracing::trace!(
-    //         "Calculated end_position_in_array: {}",
-    //         end_position_in_array
-    //     );
-    //     let end_index = self.get(end_position_in_array)?;
-
-    //     tracing::trace!("Found end_index: {:?}", end_index);
-
-    //     let start_position = start_index.position();
-    //     // Check to prevent overflow in case end_index.position() < 
start_position
-    //     let bytes = if end_index.position() >= start_position {
-    //         end_index.position() - start_position
-    //     } else {
-    //         tracing::warn!(
-    //             "End index position {} is less than start position {}. 
Using 0 bytes.",
-    //             end_index.position(),
-    //             start_position
-    //         );
-    //         0
-    //     };
-
-    //     // Ensure we don't have underflow when calculating messages count
-    //     let actual_messages_count = if end_position_in_array >= 
start_position_in_array {
-    //         end_position_in_array - start_position_in_array + 1
-    //     } else {
-    //         tracing::warn!(
-    //             "End position {} is less than start position {}. Using 1 
message.",
-    //             end_position_in_array,
-    //             start_position_in_array
-    //         );
-    //         1
-    //     };
-
-    //     trace!(
-    //         "Calculated read boundary by timestamp: start_pos={}, bytes={}, 
count={}",
-    //         start_position,
-    //         bytes,
-    //         actual_messages_count
-    //     );
-
-    //     Some(ReadBoundary::new(
-    //         start_position,
-    //         bytes,
-    //         actual_messages_count,
-    //     ))
-    // }
-
     pub fn base_position(&self) -> u32 {
         self.base_position
     }
diff --git a/sdk/src/models/messaging/message.rs 
b/sdk/src/models/messaging/message.rs
index 07c911d9..c18d9c6e 100644
--- a/sdk/src/models/messaging/message.rs
+++ b/sdk/src/models/messaging/message.rs
@@ -42,50 +42,6 @@ impl IggyMessage {
         IggyMessageBuilder::new()
     }
 
-    // /// Convert Bytes to messages
-    // pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> 
Result<Vec<IggyMessage>, IggyError> {
-    //     let mut messages = Vec::with_capacity(count as usize);
-    //     let mut position = 0;
-    //     let buf_len = buffer.len();
-
-    //     while position < buf_len {
-    //         if position + IGGY_MESSAGE_HEADER_SIZE as usize > buf_len {
-    //             break;
-    //         }
-    //         let header_bytes = buffer.slice(position..position + 
IGGY_MESSAGE_HEADER_SIZE as usize);
-    //         let header = match IggyMessageHeader::from_bytes(header_bytes) {
-    //             Ok(h) => h,
-    //             Err(e) => {
-    //                 error!("Failed to parse message header: {}", e);
-    //                 return Err(e);
-    //             }
-    //         };
-    //         position += IGGY_MESSAGE_HEADER_SIZE as usize;
-
-    //         let payload_end = position + header.payload_length as usize;
-    //         if payload_end > buf_len {
-    //             break;
-    //         }
-    //         let payload = buffer.slice(position..payload_end);
-    //         position = payload_end;
-
-    //         let headers: Option<Bytes> = if header.user_headers_length > 0 {
-    //             Some(buffer.slice(position..position + 
header.user_headers_length as usize))
-    //         } else {
-    //             None
-    //         };
-    //         position += header.user_headers_length as usize;
-
-    //         messages.push(IggyMessage {
-    //             header,
-    //             payload,
-    //             user_headers: headers,
-    //         });
-    //     }
-
-    //     Ok(messages)
-    // }
-
     /// Convert Bytes to messages
     pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> 
Result<Vec<IggyMessage>, IggyError> {
         let mut messages = Vec::with_capacity(count as usize);
@@ -184,7 +140,7 @@ impl BytesSerializable for IggyMessage {
         bytes.put_slice(&message_header);
         bytes.put_slice(&self.payload);
         if let Some(headers) = &self.user_headers {
-            bytes.put_slice(&headers);
+            bytes.put_slice(headers);
         }
         bytes.freeze()
     }
diff --git a/sdk/src/models/messaging/messages_batch.rs 
b/sdk/src/models/messaging/messages_batch.rs
index 64b3f27f..e8c044a4 100644
--- a/sdk/src/models/messaging/messages_batch.rs
+++ b/sdk/src/models/messaging/messages_batch.rs
@@ -1,4 +1,4 @@
-use super::{IggyIndexes, IggyMessageHeaderView, IggyMessageView, 
IggyMessageViewIterator};
+use super::{IggyIndexes, IggyMessageView, IggyMessageViewIterator};
 use crate::{
     error::IggyError,
     models::messaging::INDEX_SIZE,
@@ -76,7 +76,9 @@ impl IggyMessagesBatch {
         let indexes_count = self.indexes.len() / INDEX_SIZE;
         debug_assert!(
             self.count as usize == indexes_count || self.count as usize == 
indexes_count - 1,
-            "Mismatch between message count and indexes count"
+            "Mismatch between message count and indexes count ({}) != {}",
+            self.count,
+            indexes_count
         );
 
         self.count
@@ -97,13 +99,17 @@ impl IggyMessagesBatch {
         &self.messages
     }
 
+    pub fn indexes_slice(&self) -> &[u8] {
+        &self.indexes
+    }
+
     /// Decompose the batch into its components
     pub fn decompose(self) -> (IggyIndexes, Bytes, u32) {
         (self.indexes, self.messages, self.count)
     }
 
     /// Get index of first message
-    pub fn first_index(&self) -> u64 {
+    pub fn first_offset(&self) -> u64 {
         self.iter()
             .next()
             .map(|msg| msg.header().offset())
@@ -134,11 +140,19 @@ impl IggyMessagesBatch {
             .unwrap_or(0)
     }
 
+    /// Helper method to read a base position (u32) from the byte array at the 
given index
+    fn base_position_at(&self, position_index: u32) -> u32 {
+        tracing::error!("base_position = {}", self.indexes.base_position());
+        if let Some(index) = self.indexes.get(position_index) {
+            index.position() - self.indexes.base_position()
+        } else {
+            0
+        }
+    }
+
     /// Helper method to read a position (u32) from the byte array at the 
given index
-    fn read_position_at(&self, position_index: u32) -> u32 {
-        let idx = position_index * INDEX_SIZE as u32;
-        println!("idx={idx}");
-        if let Some(index) = self.indexes.get(idx) {
+    fn position_at(&self, position_index: u32) -> u32 {
+        if let Some(index) = self.indexes.get(position_index) {
             index.position()
         } else {
             0
@@ -152,46 +166,55 @@ impl IggyMessagesBatch {
             return None;
         }
 
-        let indexes_count = self.indexes.len() / INDEX_SIZE;
-        let mut pos = 0;
-
-        for i in 0..indexes_count {
-            let msg_start = if i == 0 {
-                0
-            } else {
-                self.read_position_at(i as u32 - 1) as usize
-            };
+        // TODO(hubcio): this can be optimized via binary search
+        let first_message_index = self
+            .iter()
+            .position(|msg| msg.header().offset() >= start_offset);
 
-            let msg_offset = 
IggyMessageHeaderView::new(&self.messages[msg_start..]).offset();
+        first_message_index?;
 
-            if msg_offset >= start_offset {
-                pos = i;
-                break;
-            }
+        let first_message_index = first_message_index.unwrap();
+        let last_message_index =
+            std::cmp::min(first_message_index + count as usize, self.count() 
as usize);
 
-            if i == indexes_count - 1 {
-                return None;
-            }
-        }
+        let sub_indexes = self.indexes.slice_by_offset(
+            first_message_index as u32,
+            (last_message_index - first_message_index) as u32,
+        )?;
 
-        let end_idx = std::cmp::min(pos + count as usize, indexes_count);
-
-        let first_byte = if pos == 0 {
+        let first_message_position = if first_message_index == 0 {
             0
         } else {
-            self.read_position_at(pos as u32 - 1) as usize
+            self.position_at(first_message_index as u32 - 1) as usize
+                - self.indexes.base_position() as usize
         };
 
-        let last_byte = self.read_position_at(end_idx as u32 - 1) as usize;
-        let sub_buffer = self.messages.slice(first_byte..last_byte);
+        let last_message_position = if last_message_index >= self.count() as 
usize {
+            self.messages.len()
+        } else {
+            self.position_at(last_message_index as u32 - 1) as usize
+                - self.indexes.base_position() as usize
+        };
+
+        debug_assert!(
+            first_message_position <= self.messages.len(),
+            "First message position {} exceeds buffer length {}",
+            first_message_position,
+            self.messages.len()
+        );
+        debug_assert!(
+            last_message_position <= self.messages.len(),
+            "Last message position {} exceeds buffer length {}",
+            last_message_position,
+            self.messages.len()
+        );
 
-        let sub_indexes = self
-            .indexes
-            .slice_by_offset(pos as u32, end_idx as u32)
-            .unwrap();
+        let sub_buffer = self
+            .messages
+            .slice(first_message_position..last_message_position);
 
         Some(IggyMessagesBatch {
-            count: (end_idx - pos) as u32,
+            count: (last_message_index - first_message_index) as u32,
             indexes: sub_indexes,
             messages: sub_buffer,
         })
@@ -206,45 +229,55 @@ impl IggyMessagesBatch {
             return None;
         }
 
-        let indexes_count = self.indexes.len() / 4;
-        let mut pos = None;
+        // TODO(hubcio): this can be optimized via binary search
+        let first_message_index = self
+            .iter()
+            .position(|msg| msg.header().timestamp() >= timestamp);
 
-        for i in 0..indexes_count {
-            let msg_start = if i == 0 {
-                0
-            } else {
-                self.read_position_at(i as u32 - 1) as usize
-            };
-
-            let msg_ts = 
IggyMessageHeaderView::new(&self.messages[msg_start..]).timestamp();
-
-            if msg_ts >= timestamp {
-                pos = Some(i);
-                break;
-            }
-        }
+        first_message_index?;
 
-        let pos = pos?;
+        let first_message_index = first_message_index.unwrap();
+        let last_message_index =
+            std::cmp::min(first_message_index + count as usize, self.count() 
as usize);
 
-        let end_idx = std::cmp::min(pos + count as usize, indexes_count);
+        let sub_indexes = self.indexes.slice_by_offset(
+            first_message_index as u32,
+            (last_message_index - first_message_index) as u32,
+        )?;
 
-        let first_byte = if pos == 0 {
+        let first_message_position = if first_message_index == 0 {
             0
         } else {
-            self.read_position_at(pos as u32 - 1) as usize
+            self.position_at(first_message_index as u32 - 1) as usize
+                - self.indexes.base_position() as usize
         };
 
-        let last_byte = self.read_position_at(end_idx as u32 - 1) as usize;
+        let last_message_position = if last_message_index >= self.count() as 
usize {
+            self.messages.len()
+        } else {
+            self.position_at(last_message_index as u32 - 1) as usize
+                - self.indexes.base_position() as usize
+        };
 
-        let sub_buffer = self.messages.slice(first_byte..last_byte);
+        debug_assert!(
+            first_message_position <= self.messages.len(),
+            "First message position {} exceeds buffer length {}",
+            first_message_position,
+            self.messages.len()
+        );
+        debug_assert!(
+            last_message_position <= self.messages.len(),
+            "Last message position {} exceeds buffer length {}",
+            last_message_position,
+            self.messages.len()
+        );
 
-        let sub_indexes = self
-            .indexes
-            .slice_by_offset(pos as u32, end_idx as u32)
-            .unwrap();
+        let sub_buffer = self
+            .messages
+            .slice(first_message_position..last_message_position);
 
         Some(IggyMessagesBatch {
-            count: (end_idx - pos) as u32,
+            count: (last_message_index - first_message_index) as u32,
             indexes: sub_indexes,
             messages: sub_buffer,
         })
@@ -260,13 +293,13 @@ impl IggyMessagesBatch {
         let start_position = if index == 0 {
             0
         } else {
-            self.read_position_at(index as u32 - 1) as usize
+            self.position_at(index as u32 - 1) as usize
         };
 
         let end_position = if index == self.count as usize - 1 {
             self.messages.len()
         } else {
-            self.read_position_at(index as u32) as usize
+            self.position_at(index as u32) as usize
         };
 
         Some(IggyMessageView::new(
@@ -294,13 +327,13 @@ impl Index<usize> for IggyMessagesBatch {
         let start_position = if index == 0 {
             0
         } else {
-            self.read_position_at(index as u32 - 1) as usize
+            self.position_at(index as u32 - 1) as usize
         };
 
         let end_position = if index == self.count as usize - 1 {
             self.messages.len()
         } else {
-            self.read_position_at(index as u32) as usize
+            self.position_at(index as u32) as usize
         };
 
         &self.messages[start_position..end_position]
diff --git a/server/src/streaming/partitions/messages.rs 
b/server/src/streaming/partitions/messages.rs
index 48b9370c..4dc3ef4c 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -27,14 +27,12 @@ impl Partition {
 
         let query_ts = timestamp.as_micros();
 
-        // Filter segments that may contain messages with timestamp >= query_ts
         let filtered_segments: Vec<&Segment> = self
             .segments
             .iter()
             .filter(|segment| segment.end_timestamp() >= query_ts)
             .collect();
 
-        // Use a dedicated method to retrieve messages across segments by 
timestamp
         Self::get_messages_from_segments_by_timestamp(filtered_segments, 
query_ts, count).await
     }
 
@@ -50,23 +48,22 @@ impl Partition {
             self.current_offset
         );
 
-        if self.segments.is_empty() || start_offset > self.current_offset {
+        if self.segments.is_empty() || start_offset > self.current_offset || 
count == 0 {
             return Ok(IggyMessagesBatchSet::empty());
         }
 
-        let end_offset = self.get_end_offset(start_offset, count);
-
-        // TODO: Most likely don't need to find the specific range of 
segments, just find the first segment containing the first offset
-        // and during reads roll to the next one, when the first is exhausted.
-        let segments = self.filter_segments_by_offsets(start_offset, 
end_offset);
-        let out = match segments.len() {
-            0 => panic!("TODO"),
-            1 => Ok(segments[0]
-                .get_messages_by_offset(start_offset, count)
-                .await?),
-            _ => Ok(Self::get_messages_from_segments(segments, start_offset, 
count).await?),
-        }?;
-        Ok(out)
+        let start_segment_idx = match self
+            .segments
+            .iter()
+            .rposition(|segment| segment.start_offset() <= start_offset)
+        {
+            Some(idx) => idx,
+            None => return Ok(IggyMessagesBatchSet::empty()),
+        };
+
+        let relevant_segments: Vec<&Segment> = 
self.segments[start_segment_idx..].iter().collect();
+
+        Self::get_messages_from_segments(relevant_segments, start_offset, 
count).await
     }
 
     // Retrieves the first messages (up to a specified count).
@@ -128,30 +125,6 @@ impl Partition {
         self.get_messages_by_offset(offset, count).await
     }
 
-    fn get_end_offset(&self, offset: u64, count: u32) -> u64 {
-        let mut end_offset = offset + (count - 1) as u64;
-        let segment = self.segments.last().unwrap();
-        let max_offset = segment.end_offset();
-        if end_offset > max_offset {
-            end_offset = max_offset;
-        }
-
-        end_offset
-    }
-
-    fn filter_segments_by_offsets(&self, start_offset: u64, end_offset: u64) 
-> Vec<&Segment> {
-        let slice_start = self
-            .segments
-            .iter()
-            .rposition(|segment| segment.start_offset() <= start_offset)
-            .unwrap_or(0);
-
-        self.segments[slice_start..]
-            .iter()
-            .filter(|segment| segment.start_offset() <= end_offset)
-            .collect()
-    }
-
     /// Retrieves messages from multiple segments.
     async fn get_messages_from_segments(
         segments: Vec<&Segment>,
@@ -161,21 +134,22 @@ impl Partition {
         let mut remaining_count = count;
         let mut current_offset = offset;
         let mut batches = IggyMessagesBatchSet::empty();
+
         for segment in segments {
             if remaining_count == 0 {
                 break;
             }
 
             let messages = segment
-                .get_messages_by_offset(current_offset, remaining_count)
-                .await
-                .with_error_context(|error| {
-                    format!(
-                        "{COMPONENT} (error: {error}) - failed to get messages 
from segment, segment: {}, \
-                         offset: {}, count: {}",
-                        segment, current_offset, remaining_count
-                    )
-                })?;
+            .get_messages_by_offset(current_offset, remaining_count)
+            .await
+            .with_error_context(|error| {
+                format!(
+                    "{COMPONENT} (error: {error}) - failed to get messages 
from segment, segment: {}, \
+                     offset: {}, count: {}",
+                    segment, current_offset, remaining_count
+                )
+            })?;
 
             let messages_count = messages.count();
             if messages_count == 0 {
@@ -184,37 +158,16 @@ impl Partition {
 
             remaining_count = remaining_count.saturating_sub(messages_count);
 
-            if messages_count > 0 {
+            if let Some(last_offset) = messages.last_offset() {
+                current_offset = last_offset + 1;
+            } else if messages_count > 0 {
                 current_offset += messages_count as u64;
             }
+
             batches.add_batch_set(messages);
         }
 
         Ok(batches)
-        // let mut results = Vec::new();
-        // let mut remaining_count = count;
-        // for segment in segments {
-        //     if remaining_count == 0 {
-        //         break;
-        //     }
-        //     let slices = segment
-        //         .get_messages_by_offset(offset, remaining_count)
-        //         .await
-        //         .with_error_context(|error| {
-        //             format!(
-        //                 "{COMPONENT} (error: {error}) - failed to get 
messages from segment, segment: {}, \
-        //                  offset: {}, count: {}",
-        //                 segment, offset, remaining_count
-        //             )
-        //         })?;
-        //     let messages_count = slices
-        //         .iter()
-        //         .map(|slice| slice.header.last_offset_delta)
-        //         .sum();
-        //     remaining_count = 
remaining_count.saturating_sub(messages_count);
-        //     results.extend(slices);
-        // }
-        // Ok(results)
     }
 
     /// Retrieves messages from multiple segments by timestamp.
@@ -242,7 +195,6 @@ impl Partition {
                     )
                 })?;
 
-            // Update remaining count for the next segment
             let messages_count = messages.count();
             remaining_count = remaining_count.saturating_sub(messages_count);
 
@@ -263,19 +215,18 @@ impl Partition {
             messages.size(),
             self.partition_id
         );
-        {
-            let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
-            if last_segment.is_closed() {
-                let start_offset = last_segment.end_offset() + 1;
-                trace!(
+
+        let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
+        if last_segment.is_closed() {
+            let start_offset = last_segment.end_offset() + 1;
+            trace!(
                     "Current segment is closed, creating new segment with 
start offset: {} for partition with ID: {}...",
                     start_offset, self.partition_id
                 );
-                
self.add_persisted_segment(start_offset).await.with_error_context(|error| 
format!(
+            
self.add_persisted_segment(start_offset).await.with_error_context(|error| 
format!(
                     "{COMPONENT} (error: {error}) - failed to add persisted 
segment, partition: {}, start offset: {}",
                     self, start_offset,
                 ))?
-            }
         }
 
         let current_offset = if !self.should_increment_offset {
@@ -317,8 +268,6 @@ impl Partition {
         }
         */
 
-        println!("Current offset before appending batch: {}", current_offset);
-
         let messages_count = {
             let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
             last_segment
@@ -338,8 +287,6 @@ impl Partition {
             current_offset + messages_count as u64 - 1
         };
 
-        println!("Last offset after appending batch: {}", last_offset);
-
         if self.should_increment_offset {
             self.current_offset = last_offset;
         } else {
@@ -348,20 +295,19 @@ impl Partition {
         }
 
         self.unsaved_messages_count += messages_count;
+
+        let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
+        if self.unsaved_messages_count >= 
self.config.partition.messages_required_to_save
+            || last_segment.is_full().await
         {
-            let last_segment = 
self.segments.last_mut().ok_or(IggyError::SegmentNotFound)?;
-            if self.unsaved_messages_count >= 
self.config.partition.messages_required_to_save
-                || last_segment.is_full().await
-            {
-                trace!(
+            trace!(
                     "Segment with start offset: {} for partition with ID: {} 
will be persisted on disk...",
                     last_segment.start_offset(),
                     self.partition_id
                 );
 
-                last_segment.persist_messages(confirmation).await.unwrap();
-                self.unsaved_messages_count = 0;
-            }
+            last_segment.persist_messages(confirmation).await.unwrap();
+            self.unsaved_messages_count = 0;
         }
 
         Ok(())
diff --git a/server/src/streaming/segments/indexes/index_reader.rs 
b/server/src/streaming/segments/indexes/index_reader.rs
index 955fd4bb..98b4fd52 100644
--- a/server/src/streaming/segments/indexes/index_reader.rs
+++ b/server/src/streaming/segments/indexes/index_reader.rs
@@ -302,11 +302,7 @@ impl IndexReader {
             }
         }
 
-        if low > 0 {
-            Ok(Some(low - 1))
-        } else {
-            Ok(Some(0))
-        }
+        Ok(Some(low))
     }
 
     /// Returns the size of the index file in bytes.
diff --git a/server/src/streaming/segments/indexes/index_writer.rs 
b/server/src/streaming/segments/indexes/index_writer.rs
index d3094a9f..a4e0eb43 100644
--- a/server/src/streaming/segments/indexes/index_writer.rs
+++ b/server/src/streaming/segments/indexes/index_writer.rs
@@ -87,7 +87,7 @@ impl IndexWriter {
         self.index_size_bytes
             .fetch_add(indexes.len() as u64, Ordering::Release);
 
-        trace!("Saved {count} indexes to file: {}", self.file_path);
+        trace!("XXXX Saved {count} indexes to file: {}", self.file_path);
 
         Ok(())
     }
diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs 
b/server/src/streaming/segments/indexes/indexes_mut.rs
index ff0f3af1..535676d7 100644
--- a/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -8,7 +8,7 @@ use std::ops::{Deref, Index as StdIndex};
 #[derive(Default, Clone)]
 pub struct IggyIndexesMut {
     buffer: BytesMut,
-    unsaved_count: u32,
+    saved_count: u32,
 }
 
 impl IggyIndexesMut {
@@ -16,7 +16,7 @@ impl IggyIndexesMut {
     pub fn empty() -> Self {
         Self {
             buffer: BytesMut::new(),
-            unsaved_count: 0,
+            saved_count: 0,
         }
     }
 
@@ -24,7 +24,7 @@ impl IggyIndexesMut {
     pub fn from_bytes(indexes: BytesMut) -> Self {
         Self {
             buffer: indexes,
-            unsaved_count: 0,
+            saved_count: 0,
         }
     }
 
@@ -32,18 +32,13 @@ impl IggyIndexesMut {
     pub fn with_capacity(capacity: usize) -> Self {
         Self {
             buffer: BytesMut::with_capacity(capacity * INDEX_SIZE),
-            unsaved_count: 0,
+            saved_count: 0,
         }
     }
 
     /// Makes the indexes immutable
-    pub fn make_immutable(self) -> IggyIndexes {
-        IggyIndexes::new(self.buffer.freeze(), self.unsaved_count)
-    }
-
-    /// Sets the number of unsaved messages
-    pub fn set_unsaved_messages_count(&mut self, count: u32) {
-        self.unsaved_count = count;
+    pub fn make_immutable(self, base_position: u32) -> IggyIndexes {
+        IggyIndexes::new(self.buffer.freeze(), base_position)
     }
 
     /// Inserts a new index at the end of buffer
@@ -53,10 +48,9 @@ impl IggyIndexesMut {
         self.buffer.put_u64_le(timestamp);
     }
 
-    /// Appends another IggyIndexesMut instance to this one. Other indexes 
buffer is consumed.
-    pub fn concatenate(&mut self, other: IggyIndexesMut) {
-        self.buffer.put_slice(&other.buffer);
-        self.unsaved_count += other.unsaved_count;
+    /// Appends another slice of indexes to this one.
+    pub fn concatenate(&mut self, other: &[u8]) {
+        self.buffer.put_slice(other);
     }
 
     /// Gets the number of indexes in the container
@@ -168,27 +162,26 @@ impl IggyIndexesMut {
         result
     }
 
-    /// Gets a slice of the buffer containing the last N unsaved indexes.
-    pub fn get_unsaved_indexes(&self) -> &[u8] {
-        if self.count() == 0 || self.unsaved_count == 0 {
-            tracing::error!("No unsaved indexes");
-            return &[];
-        }
-
-        let start_idx = (self.count() - self.unsaved_count) as usize * 16;
+    /// Clears the container, removing all indexes
+    pub fn clear(&mut self) {
+        self.saved_count = 0;
+        self.buffer.clear();
+    }
 
-        &self.buffer[start_idx..]
+    /// Gets the number of unsaved indexes
+    pub fn unsaved_count(&self) -> u32 {
+        self.count().saturating_sub(self.saved_count)
     }
 
-    /// Mark all unsaved indexes as saved
-    pub fn mark_saved(&mut self) {
-        self.unsaved_count = 0;
+    /// Gets the unsaved part of the index buffer
+    pub fn unsaved_slice(&self) -> &[u8] {
+        let start_pos = self.saved_count as usize * INDEX_SIZE;
+        &self.buffer[start_pos..]
     }
 
-    /// Clears the container, removing all indexes
-    pub fn clear(&mut self) {
-        self.buffer.clear();
-        self.unsaved_count = 0;
+    /// Mark all indexes as saved to disk
+    pub fn mark_saved(&mut self) {
+        self.saved_count = self.count();
     }
 
     /// Slices the container to return a view of a specific range of indexes
@@ -281,11 +274,7 @@ impl IggyIndexesMut {
             }
         }
 
-        if low > 0 {
-            Some(low - 1)
-        } else {
-            Some(0)
-        }
+        Some(low)
     }
 }
 
@@ -317,7 +306,6 @@ impl fmt::Debug for IggyIndexesMut {
 
         writeln!(f, "IggyIndexesMut {{")?;
         writeln!(f, "    count: {},", count)?;
-        writeln!(f, "    unsaved_count: {},", self.unsaved_count)?;
         writeln!(f, "    indexes: [")?;
 
         for i in 0..count {
diff --git a/server/src/streaming/segments/messages/messages_writer.rs 
b/server/src/streaming/segments/messages/messages_writer.rs
index c730fdc4..9d0cdd44 100644
--- a/server/src/streaming/segments/messages/messages_writer.rs
+++ b/server/src/streaming/segments/messages/messages_writer.rs
@@ -84,10 +84,17 @@ impl MessagesWriter {
         confirmation: Confirmation,
     ) -> Result<IggyByteSize, IggyError> {
         let messages_size = batch_set.size();
+        let messages_count = batch_set.count();
+        let containers_count = batch_set.containers_count();
         trace!(
-            "Saving batch of size {messages_size} bytes to messages file: {}",
+            "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
             self.file_path
         );
+
+        for container in batch_set.iter() {
+            tracing::error!("Container size: {}", container.size());
+        }
+
         match confirmation {
             Confirmation::Wait => {
                 if let Some(ref mut file) = self.file {
@@ -108,7 +115,7 @@ impl MessagesWriter {
                 self.messages_size_bytes
                     .fetch_add(messages_size as u64, Ordering::AcqRel);
                 trace!(
-                    "Written batch of size {messages_size} bytes to messages 
file: {}",
+                    "Written batch of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
                     self.file_path
                 );
                 if self.fsync {
diff --git a/server/src/streaming/segments/messages/mod.rs 
b/server/src/streaming/segments/messages/mod.rs
index 9480de2a..0642fdbb 100644
--- a/server/src/streaming/segments/messages/mod.rs
+++ b/server/src/streaming/segments/messages/mod.rs
@@ -21,16 +21,21 @@ async fn write_batch(
     let mut slices: Vec<IoSlice> = batches.iter().map(|b| 
IoSlice::new(b)).collect();
 
     let slices = &mut slices.as_mut_slice();
-    let mut written = 0;
+    let mut total_written = 0;
+
     while !slices.is_empty() {
-        written += file
+        let bytes_written = file
             .write_vectored(slices)
             .await
             .with_error_context(|error| {
                 format!("Failed to write messages to file: {file_path}, error: 
{error}",)
             })
             .map_err(|_| IggyError::CannotWriteToFile)?;
-        IoSlice::advance_slices(slices, written);
+
+        total_written += bytes_written;
+
+        IoSlice::advance_slices(slices, bytes_written);
     }
-    Ok(written)
+
+    Ok(total_written)
 }
diff --git a/server/src/streaming/segments/messages_accumulator.rs 
b/server/src/streaming/segments/messages_accumulator.rs
index b0d7ae35..1d11a1a7 100644
--- a/server/src/streaming/segments/messages_accumulator.rs
+++ b/server/src/streaming/segments/messages_accumulator.rs
@@ -2,44 +2,54 @@ use super::{
     indexes::IggyIndexesMut,
     types::{IggyMessagesBatchMut, IggyMessagesBatchSet},
 };
-use iggy::utils::timestamp::IggyTimestamp;
+use tracing::trace;
 
-/// A container that accumulates messages before they are written to disk
+/// A container that accumulates messages in memory before they are written to 
disk.
+///
+/// The accumulator serves as a staging area for messages, allowing them to be
+/// collected and prepared for persistence. It maintains metadata like offsets,
+/// timestamps, and positions to ensure correct ordering and indexing.
 #[derive(Debug, Default)]
 pub struct MessagesAccumulator {
-    /// Base offset of the first message
+    /// Base offset of the first message in the accumulator
     base_offset: u64,
 
-    /// Base timestamp of the first message
-    base_timestamp: u64,
-
-    /// Current offset
+    /// Current (latest) offset in the accumulator
     current_offset: u64,
 
-    /// Current position
+    /// Current byte position for the next message in the segment
     current_position: u32,
 
-    /// Current timestamp
-    current_timestamp: u64,
-
-    /// A buffer containing all accumulated messages
+    /// Collection of all message batches in the accumulator
     batches: IggyMessagesBatchSet,
 
-    /// Number of messages in the accumulator
+    /// Total number of messages in the accumulator
     messages_count: u32,
 }
 
 impl MessagesAccumulator {
-    /// Coalesces a batch of messages into the accumulator
-    /// This method also prepares the messages for persistence by setting
-    /// their offset, timestamp, record size, and checksum fields
-    /// Returns the number of messages in the batch
+    /// Adds a batch of messages to the accumulator and prepares them for 
persistence.
+    ///
+    /// This method assigns offsets, timestamps, and positions to the messages
+    /// and updates the indexes accordingly. It ensures message continuity by
+    /// managing offsets to prevent gaps or overlaps.
+    ///
+    /// # Arguments
+    ///
+    /// * `start_offset` - The segment's starting offset
+    /// * `current_offset` - The suggested starting offset for this batch
+    /// * `current_position` - The current byte position in the segment
+    /// * `indexes` - The segment's index data to update
+    /// * `batch` - The batch of messages to add
+    ///
+    /// # Returns
+    ///
+    /// The number of messages successfully added to the accumulator
     pub fn coalesce_batch(
         &mut self,
-        start_offset: u64,
-        current_offset: u64,
-        current_position: u32,
-        indexes: &mut IggyIndexesMut,
+        segment_start_offset: u64,
+        segment_current_offset: u64,
+        segment_current_position: u32,
         batch: IggyMessagesBatchMut,
     ) -> u32 {
         let batch_messages_count = batch.count();
@@ -48,68 +58,69 @@ impl MessagesAccumulator {
             return 0;
         }
 
-        println!("Coalescing batch: base_offset={}, current_offset={}, 
messages_count={}, batch_count={}",
-                 self.base_offset, current_offset, self.messages_count, 
batch_messages_count);
+        trace!(
+            "Coalescing batch base_offset={}, segment_current_offset={}, 
messages_count={}, batch_count={}",
+            self.base_offset,
+            segment_current_offset,
+            self.messages_count,
+            batch_messages_count
+        );
 
-        if self.batches.is_empty() {
-            self.base_offset = current_offset;
-            self.base_timestamp = IggyTimestamp::now().as_micros();
-            self.current_offset = current_offset;
-            self.current_timestamp = self.base_timestamp;
-            self.current_position = current_position;
-        } else {
-            // Important: If we already have messages, the current_offset 
should be the correct
-            // starting point for this batch.
-            // If current_offset is greater than our current highest offset + 
1, use it as is.
-            // Otherwise, use our highest offset + 1 to ensure no overlap.
-            let next_expected_offset = self.current_offset + 1;
-            if current_offset > next_expected_offset {
-                self.current_offset = current_offset;
-            } else {
-                self.current_offset = next_expected_offset;
-            }
-        }
+        self.initialize_or_update_offsets(segment_current_offset, 
segment_current_position);
 
-        let batch = batch.prepare_for_persistence(
-            start_offset,
+        let prepared_batch = batch.prepare_for_persistence(
+            segment_start_offset,
             self.current_offset,
             self.current_position,
-            indexes,
         );
-        let batch_size = batch.size();
 
-        self.batches.add_batch(batch);
+        let batch_size = prepared_batch.size();
+
+        self.batches.add_batch(prepared_batch);
 
         self.messages_count += batch_messages_count;
-        // Update current_offset to be the last offset in this batch
         self.current_offset = self.base_offset + self.messages_count as u64 - 
1;
-
-        println!(
-            "After coalescing: base_offset={}, current_offset={}, 
messages_count={}",
-            self.base_offset, self.current_offset, self.messages_count
-        );
-
-        self.current_timestamp = self
-            .batches
-            .iter()
-            .last()
-            .unwrap()
-            .iter()
-            .last()
-            .unwrap()
-            .header()
-            .timestamp();
         self.current_position += batch_size;
 
         batch_messages_count
     }
 
-    /// Gets messages from the accumulator based on start offset and count
+    /// Initialize accumulator state for the first batch or update offsets for 
subsequent batches
+    fn initialize_or_update_offsets(&mut self, current_offset: u64, 
current_position: u32) {
+        if self.batches.is_empty() {
+            self.base_offset = current_offset;
+            self.current_offset = current_offset;
+            self.current_position = current_position;
+        } else {
+            let next_expected_offset = self.current_offset + 1;
+            self.current_offset = current_offset.max(next_expected_offset);
+        }
+    }
+
+    /// Retrieves messages from the accumulator based on start offset and 
count.
+    ///
+    /// # Arguments
+    ///
+    /// * `start_offset` - The starting offset to retrieve messages from
+    /// * `count` - Maximum number of messages to retrieve
+    ///
+    /// # Returns
+    ///
+    /// A batch set containing the requested messages
     pub fn get_messages_by_offset(&self, start_offset: u64, count: u32) -> 
IggyMessagesBatchSet {
         self.batches.get_by_offset(start_offset, count)
     }
 
-    /// Gets messages from the accumulator based on start timestamp and count
+    /// Retrieves messages from the accumulator based on start timestamp and 
count.
+    ///
+    /// # Arguments
+    ///
+    /// * `start_timestamp` - The earliest timestamp to retrieve messages from
+    /// * `count` - Maximum number of messages to retrieve
+    ///
+    /// # Returns
+    ///
+    /// A batch set containing the requested messages
     pub fn get_messages_by_timestamp(
         &self,
         start_timestamp: u64,
@@ -118,38 +129,57 @@ impl MessagesAccumulator {
         self.batches.get_by_timestamp(start_timestamp, count)
     }
 
-    /// Checks if the accumulator is empty
+    /// Checks if the accumulator is empty (has no messages).
     pub fn is_empty(&self) -> bool {
         self.batches.is_empty() || self.messages_count == 0
     }
 
-    /// Returns the number of unsaved messages
+    /// Returns the number of messages in the accumulator that have not been 
persisted.
     pub fn unsaved_messages_count(&self) -> usize {
         self.messages_count as usize
     }
 
-    /// Returns the maximum offset in the accumulator
+    /// Returns the highest offset in the accumulator.
     pub fn last_offset(&self) -> u64 {
         self.current_offset
     }
 
-    /// Returns the maximum timestamp in the accumulator
+    /// Returns the timestamp of the last message in the accumulator.
     pub fn last_timestamp(&self) -> u64 {
-        self.current_timestamp
+        self.batches.last_timestamp().unwrap_or(0)
     }
 
-    /// Returns the base offset of the accumulator
-    pub fn base_offset(&self) -> u64 {
+    /// Returns the starting offset of the first message in the accumulator.
+    pub fn first_offset(&self) -> u64 {
         self.base_offset
     }
 
-    /// Returns the base timestamp of the accumulator
-    pub fn base_timestamp(&self) -> u64 {
-        self.base_timestamp
+    /// Returns the timestamp of the first message in the accumulator.
+    pub fn first_timestamp(&self) -> u64 {
+        self.batches.first_timestamp().unwrap_or(0)
+    }
+
+    /// Updates the segment's indexes with index information from all batches 
in this accumulator.
+    ///
+    /// This method transfers all the index information from the accumulator's 
batches to the
+    /// provided segment indexes object, ensuring that message positions, 
offsets, and timestamps
+    /// are properly recorded for later retrieval.
+    ///
+    /// # Arguments
+    ///
+    /// * `segment_indexes` - The segment's mutable indexes to update
+    pub fn update_indexes(&self, segment_indexes: &mut IggyIndexesMut) {
+        for batch in self.batches.iter() {
+            segment_indexes.concatenate(batch.indexes_slice());
+        }
+
+        tracing::error!("hubcio after update indexes: {}", 
segment_indexes.count());
     }
 
-    /// Takes ownership of the accumulator and returns the messages and indexes
-    pub fn materialize(self) -> IggyMessagesBatchSet {
+    /// Consumes the accumulator and returns the contained message batches.
+    ///
+    /// This is typically called when it's time to persist the accumulated 
messages to disk.
+    pub fn into_batch_set(self) -> IggyMessagesBatchSet {
         self.batches
     }
 }
diff --git a/server/src/streaming/segments/reading_messages.rs 
b/server/src/streaming/segments/reading_messages.rs
index 2248c5be..fdf4deaa 100644
--- a/server/src/streaming/segments/reading_messages.rs
+++ b/server/src/streaming/segments/reading_messages.rs
@@ -15,10 +15,6 @@ impl Segment {
     }
 
     pub fn get_messages_count(&self) -> u32 {
-        if self.get_messages_size() == 0 {
-            return 0;
-        }
-
         (self.end_offset - self.start_offset + 1) as u32
     }
 
@@ -43,9 +39,11 @@ impl Segment {
                 .await;
         }
 
-        let accumulator_first_timestamp = self.accumulator.base_timestamp();
+        let accumulator_first_timestamp = self.accumulator.first_timestamp();
         let accumulator_last_timestamp = self.accumulator.last_timestamp();
 
+        tracing::error!("hubcio accumulator first timestamp: 
{accumulator_first_timestamp}, last timestamp: {accumulator_last_timestamp}");
+
         // Case 1: Requested timestamp is higher than any available timestamp
         if timestamp > accumulator_last_timestamp {
             return Ok(IggyMessagesBatchSet::empty());
@@ -107,44 +105,89 @@ impl Segment {
             end_offset = self.end_offset;
         }
 
+        trace!(
+            "Getting messages by offset: {}, count: {}, segment start_offset: 
{}, segment end_offset: {}",
+            offset,
+            count,
+            self.start_offset,
+            self.end_offset
+        );
+
         // Case 0: Accumulator is empty, so all messages have to be on disk
         if self.accumulator.is_empty() {
             return self.load_messages_from_disk_by_offset(offset, count).await;
         }
 
-        let accumulator_first_msg_offset = self.accumulator.base_offset();
+        let accumulator_first_msg_offset = self.accumulator.first_offset();
         let accumulator_last_msg_offset = self.accumulator.last_offset();
 
         // Case 1: All messages are in accumulator buffer
         if offset >= accumulator_first_msg_offset && end_offset <= 
accumulator_last_msg_offset {
+            tracing::error!(
+                "hubcio segment has {} messages, getting all from accumulator",
+                self.get_messages_count()
+            );
             return Ok(self.accumulator.get_messages_by_offset(offset, count));
         }
 
         // Case 2: All messages are on disk
         if end_offset < accumulator_first_msg_offset {
+            tracing::error!(
+                "hubcio segment has {} messages, getting all from disk",
+                self.get_messages_count()
+            );
             return self.load_messages_from_disk_by_offset(offset, count).await;
         }
 
         // Case 3: Messages span disk and accumulator buffer boundary
+        // Calculate how many messages we need from disk
+        let disk_count = if offset < accumulator_first_msg_offset {
+            ((accumulator_first_msg_offset - offset) as u32).min(count)
+        } else {
+            0
+        };
 
-        // Load messages from disk up to the accumulator buffer boundary
-        let mut messages = self
-                .load_messages_from_disk_by_offset(offset, 
(accumulator_first_msg_offset - offset) as u32)
-                .await.with_error_context(|error| format!(
-            "{COMPONENT} (error: {error}) - failed to load messages from disk, 
stream ID: {}, topic ID: {}, partition ID: {}, start offset: {offset}, end 
offset :{}",
-            self.stream_id, self.topic_id, self.partition_id, 
accumulator_first_msg_offset - 1
-        ))?;
-
-        // Load remaining messages from accumulator buffer
-        let buffer_start = std::cmp::max(offset, accumulator_first_msg_offset);
-        let buffer_count = (end_offset - buffer_start + 1) as u32;
-        let buffer_messages = self
-            .accumulator
-            .get_messages_by_offset(buffer_start, buffer_count);
+        let mut combined_batch_set = IggyMessagesBatchSet::empty();
 
-        messages.add_batch_set(buffer_messages);
+        // Load messages from disk if needed
+        if disk_count > 0 {
+            let disk_messages = self
+            .load_messages_from_disk_by_offset(offset, disk_count)
+            .await
+            .with_error_context(|error| {
+                format!(
+                    "STREAMING_SEGMENT (error: {error}) - failed to load 
messages from disk, stream ID: {}, topic ID: {}, partition ID: {}, start 
offset: {offset}, count: {disk_count}",
+                    self.stream_id, self.topic_id, self.partition_id
+                )
+            })?;
+
+            if !disk_messages.is_empty() {
+                combined_batch_set.add_batch_set(disk_messages);
+            }
+        }
+
+        // Calculate how many more messages we need from the accumulator
+        let remaining_count = count - combined_batch_set.count();
+
+        tracing::error!(
+            "hubcio segment has {} messages, remaining count: {}",
+            self.get_messages_count(),
+            remaining_count
+        );
+
+        if remaining_count > 0 {
+            let accumulator_start_offset = std::cmp::max(offset, 
accumulator_first_msg_offset);
+
+            let accumulator_messages = self
+                .accumulator
+                .get_messages_by_offset(accumulator_start_offset, 
remaining_count);
+
+            if !accumulator_messages.is_empty() {
+                combined_batch_set.add_batch_set(accumulator_messages);
+            }
+        }
 
-        Ok(messages)
+        Ok(combined_batch_set)
     }
 
     /// Loads and returns all message IDs from the log file.
diff --git a/server/src/streaming/segments/types/message_view_mut.rs 
b/server/src/streaming/segments/types/message_view_mut.rs
index 4a82ebd2..ca283dec 100644
--- a/server/src/streaming/segments/types/message_view_mut.rs
+++ b/server/src/streaming/segments/types/message_view_mut.rs
@@ -16,11 +16,6 @@ pub struct IggyMessageViewMut<'a> {
 impl<'a> IggyMessageViewMut<'a> {
     /// Create a new mutable message view from a buffer
     pub fn new(buffer: &'a mut [u8]) -> Self {
-        // let (payload_len, headers_len) = {
-        //     let hdr_slice = &buffer[0..IGGY_MESSAGE_HEADER_SIZE as usize];
-        //     let hdr_view = IggyMessageHeaderView::new(hdr_slice);
-        //     (hdr_view.payload_length(), hdr_view.headers_length())
-        // };
         Self {
             buffer,
             payload_offset: IGGY_MESSAGE_HEADER_SIZE as usize,
@@ -89,7 +84,6 @@ impl LendingIterator for IggyMessageViewMutIterator<'_> {
             return None;
         }
 
-        // Make sure we have enough bytes for at least a header
         if self.position + IGGY_MESSAGE_HEADER_SIZE as usize > 
self.buffer.len() {
             tracing::error!(
                 "Buffer too small for message header at position {}, buffer 
len: {}",
@@ -103,7 +97,6 @@ impl LendingIterator for IggyMessageViewMutIterator<'_> {
         let buffer_slice = &mut self.buffer[self.position..];
         let view = IggyMessageViewMut::new(buffer_slice);
 
-        // Safety check: Make sure we're advancing
         let message_size = view.size();
         if message_size == 0 {
             tracing::error!(
diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs 
b/server/src/streaming/segments/types/messages_batch_mut.rs
index de318d33..f8e5acea 100644
--- a/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -3,16 +3,21 @@ use crate::streaming::segments::indexes::IggyIndexesMut;
 use bytes::{BufMut, BytesMut};
 use iggy::models::messaging::{IggyMessagesBatch, INDEX_SIZE};
 use iggy::prelude::*;
+use iggy::utils::timestamp::IggyTimestamp;
 use lending_iterator::prelude::*;
 use std::ops::Deref;
+use tracing::trace;
 
-/// A container for mutable messages
+/// A container for mutable messages that are being prepared for persistence.
+///
+/// `IggyMessagesBatchMut` holds both the raw message data in a `BytesMut` 
buffer
+/// and the corresponding index data that allows for efficient message lookup.
 #[derive(Debug, Default)]
 pub struct IggyMessagesBatchMut {
-    /// The byte-indexes of messages in the buffer, represented as array of 
u32's
-    /// Each index points to the END position of a message (start of next 
message)
+    /// The index data for all messages in the buffer
     indexes: IggyIndexesMut,
-    /// The buffer containing the messages
+
+    /// The buffer containing the serialized message data
     messages: BytesMut,
 }
 
@@ -23,20 +28,35 @@ impl Sizeable for IggyMessagesBatchMut {
 }
 
 impl IggyMessagesBatchMut {
-    /// Create an empty messages container with bytes capacity to prevent 
realloc()
+    /// Creates an empty messages container with the specified capacity to 
avoid reallocations.
+    ///
+    /// # Arguments
+    ///
+    /// * `bytes_capacity` - The expected total size of all messages in bytes
     pub fn with_capacity(bytes_capacity: usize) -> Self {
+        let index_capacity = bytes_capacity / INDEX_SIZE + 1; // Add 1 to 
avoid rounding down to 0
         Self {
-            indexes: IggyIndexesMut::with_capacity(bytes_capacity / 
INDEX_SIZE),
+            indexes: IggyIndexesMut::with_capacity(index_capacity),
             messages: BytesMut::with_capacity(bytes_capacity),
         }
     }
 
-    /// Create a new messages container from a existing buffer of bytes
+    /// Creates a new messages container from existing index and message 
buffers.
+    ///
+    /// # Arguments
+    ///
+    /// * `indexes` - Preprocessed index data
+    /// * `messages` - Serialized message data
     pub fn from_indexes_and_messages(indexes: IggyIndexesMut, messages: 
BytesMut) -> Self {
         Self { indexes, messages }
     }
 
-    /// Create a new messages container from a slice of messages
+    /// Creates a new messages container from a slice of IggyMessage objects.
+    ///
+    /// # Arguments
+    ///
+    /// * `messages` - Slice of message objects to store
+    /// * `messages_size` - Total size of all messages in bytes
     pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self 
{
         let mut messages_buffer = BytesMut::with_capacity(messages_size as 
usize);
         let mut indexes_buffer = IggyIndexesMut::with_capacity(messages.len());
@@ -52,92 +72,109 @@ impl IggyMessagesBatchMut {
         Self::from_indexes_and_messages(indexes_buffer, messages_buffer)
     }
 
-    /// Create a lending iterator over mutable messages
+    /// Creates a lending iterator that yields mutable views of messages.
     pub fn iter_mut(&mut self) -> IggyMessageViewMutIterator {
         IggyMessageViewMutIterator::new(&mut self.messages)
     }
 
-    /// Create a lending iterator over immutable messages
+    /// Creates an iterator that yields immutable views of messages.
     pub fn iter(&self) -> IggyMessageViewIterator {
         IggyMessageViewIterator::new(&self.messages)
     }
 
-    /// Get the number of messages
+    /// Returns the number of messages in the batch.
     pub fn count(&self) -> u32 {
-        debug_assert_eq!(self.indexes.len() % INDEX_SIZE, 0);
-        self.indexes.len() as u32 / INDEX_SIZE as u32
+        let index_count = self.indexes.len() as u32 / INDEX_SIZE as u32;
+        debug_assert_eq!(
+            self.indexes.len() % INDEX_SIZE,
+            0,
+            "Index buffer length must be a multiple of INDEX_SIZE"
+        );
+        index_count
     }
 
-    /// Get the total size of all messages in bytes
+    /// Returns the total size of all messages in bytes.
     pub fn size(&self) -> u32 {
         self.messages.len() as u32
     }
 
-    /// Makes the messages batch immutable
-    pub fn make_immutable(self) -> IggyMessagesBatch {
-        let messages_count = self.count();
-        let indexes = self.indexes.make_immutable();
-        let messages = self.messages.freeze();
-        IggyMessagesBatch::new(indexes, messages, messages_count)
-    }
-
-    /// Prepares all messages in the batch for persistence by setting their 
offsets, timestamps,
-    /// and other necessary fields. Returns the prepared, immutable messages.
+    /// Prepares all messages in the batch for persistence by setting their 
offsets,
+    /// timestamps, and other necessary fields.
+    ///
+    /// # Arguments
+    ///
+    /// * `start_offset` - The starting offset of the segment
+    /// * `base_offset` - The base offset for this batch of messages
+    /// * `current_position` - The current position in the segment
+    /// * `segment_indexes` - The segment's index data, which will be updated
+    ///
+    /// # Returns
+    ///
+    /// An immutable `IggyMessagesBatch` ready for persistence
     pub fn prepare_for_persistence(
         self,
         start_offset: u64,
         base_offset: u64,
         current_position: u32,
-        segment_indexes: &mut IggyIndexesMut,
     ) -> IggyMessagesBatch {
         let messages_count = self.count();
+        if messages_count == 0 {
+            return IggyMessagesBatch::empty();
+        }
+
         let timestamp = IggyTimestamp::now().as_micros();
 
-        let mut curr_abs_offset = base_offset;
-        let mut current_position = current_position;
         let (mut indexes, mut messages) = self.decompose();
+
+        let mut curr_abs_offset = base_offset;
+        let mut curr_position = current_position;
+
         let mut iter = IggyMessageViewMutIterator::new(&mut messages);
         let mut curr_rel_offset: u32 = 0;
 
-        // TODO: fix crash when idx cache is disabled
         while let Some(mut message) = iter.next() {
             message.header_mut().set_offset(curr_abs_offset);
             message.header_mut().set_timestamp(timestamp);
             message.update_checksum();
 
-            current_position += message.size() as u32;
+            let message_size = message.size() as u32;
+            curr_position += message_size;
 
-            indexes.set_offset_at(curr_rel_offset, curr_abs_offset as u32);
-            indexes.set_position_at(curr_rel_offset, current_position);
+            let relative_offset = (curr_abs_offset - start_offset) as u32;
+            indexes.set_offset_at(curr_rel_offset, relative_offset);
+            indexes.set_position_at(curr_rel_offset, curr_position);
             indexes.set_timestamp_at(curr_rel_offset, timestamp);
 
-            println!(
-                "Message {} offset: {}, position: {}, timestamp: {}",
-                curr_rel_offset, curr_abs_offset, current_position, timestamp
-            );
-
             curr_abs_offset += 1;
             curr_rel_offset += 1;
         }
-        indexes.set_unsaved_messages_count(messages_count);
 
-        segment_indexes.concatenate(indexes);
+        IggyMessagesBatch::new(
+            indexes.make_immutable(current_position),
+            messages.freeze(),
+            messages_count,
+        )
+    }
 
-        let relative_offset = base_offset - start_offset;
-        let indexes = segment_indexes
-            .slice_by_offset(
-                relative_offset as u32,
-                relative_offset as u32 + messages_count,
-            )
-            .unwrap();
+    /// Returns the first timestamp in the batch
+    pub fn first_timestamp(&self) -> u64 {
+        IggyMessageView::new(&self.messages).header().timestamp()
+    }
 
-        IggyMessagesBatch::new(indexes, messages.freeze(), messages_count)
+    /// Returns the last timestamp in the batch
+    pub fn last_timestamp(&self) -> u64 {
+        let last_message_offset = self.indexes.get(self.count() - 
1).unwrap().offset();
+        IggyMessageView::new(&self.messages[last_message_offset as usize..])
+            .header()
+            .timestamp()
     }
 
+    /// Checks if the batch is empty.
     pub fn is_empty(&self) -> bool {
         self.count() == 0
     }
 
+    /// Decomposes the batch into its constituent parts.
     pub fn decompose(self) -> (IggyIndexesMut, BytesMut) {
         (self.indexes, self.messages)
     }
@@ -176,6 +213,11 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
 
         let remaining = &self.buffer[self.position..];
         if remaining.len() < IGGY_MESSAGE_HEADER_SIZE as usize {
+            trace!(
+                "Buffer too small for message header at position {}, buffer 
len: {}",
+                self.position,
+                self.buffer.len()
+            );
             return None;
         }
 
@@ -185,6 +227,12 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
             + IGGY_MESSAGE_HEADER_SIZE as usize;
 
         if message_size > remaining.len() {
+            trace!(
+                "Message size {} exceeds remaining buffer size {} at position 
{}",
+                message_size,
+                remaining.len(),
+                self.position
+            );
             return None;
         }
 
diff --git a/server/src/streaming/segments/types/messages_batch_set.rs 
b/server/src/streaming/segments/types/messages_batch_set.rs
index 1593da92..34b9840c 100644
--- a/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/server/src/streaming/segments/types/messages_batch_set.rs
@@ -76,6 +76,16 @@ impl IggyMessagesBatchSet {
         self.batches.is_empty() || self.count == 0
     }
 
+    /// Get first timestamp of first batch
+    pub fn first_timestamp(&self) -> Option<u64> {
+        self.batches.first().map(|batch| batch.first_timestamp())
+    }
+
+    /// Get first offset of first batch
+    pub fn first_offset(&self) -> Option<u64> {
+        self.batches.first().map(|batch| batch.first_offset())
+    }
+
     /// Get offset of last message of last batch
     pub fn last_offset(&self) -> Option<u64> {
         self.batches.last().map(|batch| batch.last_offset())
@@ -109,24 +119,43 @@ impl IggyMessagesBatchSet {
         if self.is_empty() || count == 0 {
             return Self::empty();
         }
+        tracing::trace!(
+            "Getting {} messages from batch set, start offset {}, end offset 
calculated {}, end offset real {}, messages count {}...",
+            count,
+            start_offset,
+            start_offset + count as u64 - 1,
+            self.last_offset().unwrap_or(0),
+            self.count()
+        );
 
         let mut result = Self::with_capacity(self.containers_count());
         let mut remaining_count = count;
 
         for container in self.iter() {
+            tracing::error!(
+                "BATCH_SET container has {} messages, first offset {}, last 
offset {}",
+                container.count(),
+                container.first_offset(),
+                container.last_offset()
+            );
             if remaining_count == 0 {
                 break;
             }
 
-            let first_offset = container.first_index();
+            let first_offset = container.first_offset();
             if first_offset + container.count() as u64 <= start_offset {
                 continue;
             }
 
             if let Some(sliced) = container.slice_by_offset(start_offset, 
remaining_count) {
                 if sliced.count() > 0 {
+                    tracing::error!(
+                        "BATCH_SET will get {} messages from container",
+                        sliced.count()
+                    );
                     remaining_count -= sliced.count();
                     result.add_batch(sliced);
+                    tracing::error!("BATCH_SET remaining count {}", 
remaining_count);
                 }
             }
         }
@@ -151,6 +180,17 @@ impl IggyMessagesBatchSet {
                 break;
             }
 
+            let first_timestamp = container.first_timestamp();
+            if first_timestamp < timestamp {
+                tracing::error!(
+                    "BATCH_SET container has {} messages, first timestamp {}, 
requested timestamp {}",
+                    container.count(),
+                    first_timestamp,
+                    timestamp
+                );
+                continue;
+            }
+
             if let Some(sliced) = container.slice_by_timestamp(timestamp, 
remaining_count) {
                 if sliced.count() > 0 {
                     remaining_count -= sliced.count();
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index 3004681c..4d41b2ee 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -25,12 +25,11 @@ impl Segment {
             self.start_offset,
             current_offset,
             self.last_index_position,
-            &mut self.indexes,
             messages,
         );
 
         if self.end_offset == 0 {
-            self.start_timestamp = messages_accumulator.base_timestamp();
+            self.start_timestamp = messages_accumulator.first_timestamp();
         }
         self.end_timestamp = messages_accumulator.last_timestamp();
         self.end_offset = messages_accumulator.last_offset();
@@ -62,51 +61,14 @@ impl Segment {
 
         let accumulator = std::mem::take(&mut self.accumulator);
 
-        let batches = accumulator.materialize();
+        accumulator.update_indexes(&mut self.indexes);
+
+        let batches = accumulator.into_batch_set();
         let confirmation = match confirmation {
             Some(val) => val,
             None => self.config.segment.server_confirmation,
         };
 
-        let saved_bytes = self.save_message_batches(batches, 
confirmation).await?;
-
-        self.save_indexes().await?;
-
-        self.check_and_handle_segment_full().await?;
-
-        let saved_messages_count = unsaved_messages_count;
-
-        trace!(
-            "Saved {} messages on disk in segment with start offset: {} for 
partition with ID: {}, total bytes written: {}.",
-            saved_messages_count,
-            self.start_offset,
-            self.partition_id,
-            saved_bytes
-        );
-
-        Ok(saved_messages_count)
-    }
-
-    fn update_counters(&mut self, messages_size: u64, messages_count: u64) {
-        self.size_of_parent_stream
-            .fetch_add(messages_size, Ordering::AcqRel);
-        self.size_of_parent_topic
-            .fetch_add(messages_size, Ordering::AcqRel);
-        self.size_of_parent_partition
-            .fetch_add(messages_size, Ordering::AcqRel);
-        self.messages_count_of_parent_stream
-            .fetch_add(messages_count, Ordering::SeqCst);
-        self.messages_count_of_parent_topic
-            .fetch_add(messages_count, Ordering::SeqCst);
-        self.messages_count_of_parent_partition
-            .fetch_add(messages_count, Ordering::SeqCst);
-    }
-
-    async fn save_message_batches(
-        &mut self,
-        batches: IggyMessagesBatchSet,
-        confirmation: Confirmation,
-    ) -> Result<IggyByteSize, IggyError> {
         let batch_size = batches.size();
         let batch_count = batches.count();
 
@@ -124,12 +86,7 @@ impl Segment {
 
         self.last_index_position += saved_bytes.as_bytes_u64() as u32;
 
-        Ok(saved_bytes)
-    }
-
-    async fn save_indexes(&mut self) -> Result<(), IggyError> {
-        let unsaved_indexes_slice = self.indexes.get_unsaved_indexes();
-
+        let unsaved_indexes_slice = self.indexes.unsaved_slice();
         self.index_writer
             .as_mut()
             .expect("Index writer not initialized")
@@ -145,10 +102,38 @@ impl Segment {
         self.indexes.mark_saved();
 
         if !self.config.segment.cache_indexes {
+            tracing::error!("Clearing indexes cache");
             self.indexes.clear();
         }
 
-        Ok(())
+        self.check_and_handle_segment_full().await?;
+
+        let saved_messages_count = unsaved_messages_count;
+
+        trace!(
+            "Saved {} messages on disk in segment with start offset: {} for 
partition with ID: {}, total bytes written: {}.",
+            saved_messages_count,
+            self.start_offset,
+            self.partition_id,
+            saved_bytes
+        );
+
+        Ok(saved_messages_count)
+    }
+
+    fn update_counters(&mut self, messages_size: u64, messages_count: u64) {
+        self.size_of_parent_stream
+            .fetch_add(messages_size, Ordering::AcqRel);
+        self.size_of_parent_topic
+            .fetch_add(messages_size, Ordering::AcqRel);
+        self.size_of_parent_partition
+            .fetch_add(messages_size, Ordering::AcqRel);
+        self.messages_count_of_parent_stream
+            .fetch_add(messages_count, Ordering::SeqCst);
+        self.messages_count_of_parent_topic
+            .fetch_add(messages_count, Ordering::SeqCst);
+        self.messages_count_of_parent_partition
+            .fetch_add(messages_count, Ordering::SeqCst);
     }
 
     async fn check_and_handle_segment_full(&mut self) -> Result<(), IggyError> 
{
diff --git a/server/src/streaming/systems/messages.rs 
b/server/src/streaming/systems/messages.rs
index 4ebc0d90..0b452dc7 100644
--- a/server/src/streaming/systems/messages.rs
+++ b/server/src/streaming/systems/messages.rs
@@ -168,7 +168,7 @@ impl System {
                     }
                 }
             }
-            let indexes = indexes.make_immutable();
+            let indexes = indexes.make_immutable(0);
             let decrypted_messages = decrypted_messages.freeze();
             let decrypted_batch = IggyMessagesBatch::new(indexes, 
decrypted_messages, count);
             decrypted_batches.push(decrypted_batch);

Reply via email to