This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-reads-in-flight
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 3fb911374e957fbbed5c7976383f9de96a2e04ac
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 12:48:57 2026 +0100

    perf(server): zero-copy reads from in-flight buffer during async I/O
---
 core/common/src/types/message/messages_batch.rs    |  59 +++
 core/integration/tests/streaming/get_by_offset.rs  |   3 +-
 .../tests/streaming/get_by_timestamp.rs            |   3 +-
 .../handlers/messages/poll_messages_handler.rs     |  32 +-
 core/server/src/http/http_shard_wrapper.rs         |   4 +-
 core/server/src/shard/system/messages.rs           |  21 +-
 core/server/src/shard/transmission/frame.rs        |   6 +-
 core/server/src/slab/streams.rs                    | 138 +++----
 core/server/src/streaming/partitions/helpers.rs    |  14 +-
 core/server/src/streaming/partitions/journal.rs    |   6 +-
 .../streaming/segments/messages/messages_writer.rs |   4 +-
 core/server/src/streaming/segments/messages/mod.rs |   4 +-
 core/server/src/streaming/segments/mod.rs          |   2 +
 .../streaming/segments/types/messages_batch_set.rs | 407 ++++++++++++++++-----
 core/server/src/streaming/segments/types/mod.rs    |   2 +
 15 files changed, 513 insertions(+), 192 deletions(-)

diff --git a/core/common/src/types/message/messages_batch.rs 
b/core/common/src/types/message/messages_batch.rs
index 6bc704866..703bb8f9d 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -120,6 +120,65 @@ impl IggyMessagesBatch {
         self.iter().last().map(|msg| msg.header().timestamp())
     }
 
+    /// Slice the batch by message offset. Returns messages starting at 
`start_offset` up to `count`.
+    ///
+    /// Uses `Bytes::slice()` internally for zero-copy slicing.
+    /// Returns `None` if the offset range doesn't overlap with this batch.
+    pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> 
Option<Self> {
+        if self.is_empty() || count == 0 {
+            return None;
+        }
+
+        let first_offset = self.first_offset()?;
+
+        // If requested offset is before this batch, start from beginning
+        if start_offset < first_offset {
+            return self.slice_by_index(0, count);
+        }
+
+        let last_offset = self.last_offset()?;
+        if start_offset > last_offset {
+            return None;
+        }
+
+        // Offsets are contiguous within a batch
+        let start_index = (start_offset - first_offset) as u32;
+        self.slice_by_index(start_index, count)
+    }
+
+    /// Slice the batch to return messages starting at `start_index` up to 
`count`.
+    ///
+    /// Uses `Bytes::slice()` internally for zero-copy slicing.
+    /// Returns `None` if `start_index` is out of bounds or no messages match.
+    pub fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> 
{
+        if start_index >= self.count || count == 0 {
+            return None;
+        }
+
+        let available = self.count - start_index;
+        let actual_count = count.min(available);
+
+        // Slice indexes (zero-copy via Bytes::slice)
+        let sliced_indexes = self.indexes.slice_by_offset(start_index, 
actual_count)?;
+
+        // Calculate message buffer slice boundaries
+        let msg_start = self.message_start_position(start_index as usize);
+        let msg_end = self.message_end_position((start_index + actual_count - 
1) as usize);
+
+        if msg_start > msg_end || msg_end > self.messages.len() {
+            return None;
+        }
+
+        // Slice messages (zero-copy via Bytes::slice)
+        let sliced_messages = self.messages.slice(msg_start..msg_end);
+
+        Some(Self {
+            count: actual_count,
+            indexes: sliced_indexes,
+            messages: sliced_messages,
+        })
+    }
+
     /// Calculates the start position of a message at the given index in the 
buffer
     fn message_start_position(&self, index: usize) -> usize {
         if index == 0 {
diff --git a/core/integration/tests/streaming/get_by_offset.rs 
b/core/integration/tests/streaming/get_by_offset.rs
index ca5438529..d96460744 100644
--- a/core/integration/tests/streaming/get_by_offset.rs
+++ b/core/integration/tests/streaming/get_by_offset.rs
@@ -307,8 +307,9 @@ async fn test_get_messages_by_offset(
 
         // Test 6: Validate message content and ordering for all messages
         let mut i = 0;
+        let mutable_batches = batches.into_mutable();
 
-        for batch in batches.iter() {
+        for batch in mutable_batches.iter() {
             for msg in batch.iter() {
                 let expected_offset = span_offset + i as u64;
                 assert!(
diff --git a/core/integration/tests/streaming/get_by_timestamp.rs 
b/core/integration/tests/streaming/get_by_timestamp.rs
index 90afc5967..8b7e4bb10 100644
--- a/core/integration/tests/streaming/get_by_timestamp.rs
+++ b/core/integration/tests/streaming/get_by_timestamp.rs
@@ -323,7 +323,8 @@ async fn test_get_messages_by_timestamp(
         let span_timestamp_micros = span_timestamp.as_micros();
 
         // Test 6: Validate message content and ordering
-        for batch in spanning_messages.iter() {
+        let mutable_spanning = spanning_messages.into_mutable();
+        for batch in mutable_spanning.iter() {
             for msg in batch.iter() {
                 let msg_timestamp = msg.header().timestamp();
                 assert!(
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index 354b554b0..223d7b1b0 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -22,6 +22,7 @@ use crate::binary::command::{
 use crate::binary::handlers::utils::receive_and_validate;
 use crate::shard::IggyShard;
 use crate::shard::system::messages::PollingArgs;
+use crate::streaming::segments::PolledBatches;
 use crate::streaming::session::Session;
 use anyhow::Result;
 use iggy_common::SenderKind;
@@ -71,7 +72,7 @@ impl ServerCommandHandler for PollMessages {
 
         let user_id = session.get_user_id();
         let client_id = session.client_id;
-        let (metadata, mut batch) = shard
+        let (metadata, batch) = shard
             .poll_messages(
                 client_id,
                 user_id,
@@ -91,26 +92,39 @@ impl ServerCommandHandler for PollMessages {
         // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for 
messages_count + size of all batches.
         let response_length = 4 + 8 + 4 + batch.size();
         let response_length_bytes = response_length.to_le_bytes();
+        let msg_count = batch.count();
 
         let mut bufs = Vec::with_capacity(batch.containers_count() + 3);
         let mut partition_id_buf = PooledBuffer::with_capacity(4);
         let mut current_offset_buf = PooledBuffer::with_capacity(8);
-        let mut count_buf = PooledBuffer::with_capacity(4);
+        let mut msg_count_buf = PooledBuffer::with_capacity(4);
         partition_id_buf.put_u32_le(metadata.partition_id);
         current_offset_buf.put_u64_le(metadata.current_offset);
-        count_buf.put_u32_le(batch.count());
+        msg_count_buf.put_u32_le(msg_count);
 
         bufs.push(partition_id_buf);
         bufs.push(current_offset_buf);
-        bufs.push(count_buf);
+        bufs.push(msg_count_buf);
+
+        match batch {
+            PolledBatches::Mutable(mut mutable_set) => {
+                for msg_batch in mutable_set.iter_mut() {
+                    bufs.push(msg_batch.take_messages());
+                }
+            }
+            PolledBatches::Frozen(frozen_set) => {
+                // For frozen batches, wrap Bytes in PooledBuffer for socket 
send.
+                // This copies at the handler level but avoids copies in the 
storage layer
+                // during concurrent reads while async disk I/O is in progress.
+                for msg_batch in frozen_set.iter() {
+                    bufs.push(PooledBuffer::from(msg_batch.buffer()));
+                }
+            }
+        }
 
-        batch.iter_mut().for_each(|m| {
-            bufs.push(m.take_messages());
-        });
         trace!(
             "Sending {} messages to client ({} bytes) to client",
-            batch.count(),
-            response_length
+            msg_count, response_length
         );
 
         sender
diff --git a/core/server/src/http/http_shard_wrapper.rs 
b/core/server/src/http/http_shard_wrapper.rs
index 7bcfdb65b..da5595c8e 100644
--- a/core/server/src/http/http_shard_wrapper.rs
+++ b/core/server/src/http/http_shard_wrapper.rs
@@ -26,7 +26,7 @@ use send_wrapper::SendWrapper;
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::shard::system::messages::PollingArgs;
 use crate::state::command::EntryCommand;
-use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use crate::streaming::segments::{IggyMessagesBatchMut, PolledBatches};
 use crate::streaming::topics;
 use crate::streaming::users::user::User;
 use crate::{shard::IggyShard, streaming::session::Session};
@@ -278,7 +278,7 @@ impl HttpSafeShard {
         consumer: Consumer,
         maybe_partition_id: Option<u32>,
         args: PollingArgs,
-    ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+    ) -> Result<(IggyPollMetadata, PolledBatches), IggyError> {
         let future = SendWrapper::new(self.shard().poll_messages(
             client_id,
             user_id,
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 02e8d9d0e..a53077362 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -24,7 +24,9 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{
     ShardMessage, ShardRequest, ShardRequestPayload, ShardSendRequestResult,
 };
-use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut, 
IggyMessagesBatchSet};
+use crate::streaming::segments::{
+    IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSetMut, 
PolledBatches,
+};
 use crate::streaming::traits::MainOps;
 use crate::streaming::{partitions, streams, topics};
 use err_trail::ErrContext;
@@ -129,7 +131,7 @@ impl IggyShard {
         consumer: Consumer,
         maybe_partition_id: Option<u32>,
         args: PollingArgs,
-    ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+    ) -> Result<(IggyPollMetadata, PolledBatches), IggyError> {
         self.ensure_topic_exists(&stream_id, &topic_id)?;
 
         let numeric_stream_id = self
@@ -159,7 +161,7 @@ impl IggyShard {
             true,
         )?
         else {
-            return Ok((IggyPollMetadata::new(0, 0), 
IggyMessagesBatchSet::empty()));
+            return Ok((IggyPollMetadata::new(0, 0), PolledBatches::empty()));
         };
 
         self.ensure_partition_exists(&stream_id, &topic_id, partition_id)?;
@@ -175,7 +177,7 @@ impl IggyShard {
         {
             return Ok((
                 IggyPollMetadata::new(partition_id as u32, current_offset),
-                IggyMessagesBatchSet::empty(),
+                PolledBatches::empty(),
             ));
         }
 
@@ -234,7 +236,10 @@ impl IggyShard {
         }?;
 
         let batch = if let Some(encryptor) = &self.encryptor {
-            self.decrypt_messages(batch, encryptor).await?
+            // Decryption requires mutable access, so convert to mutable
+            let mutable = batch.into_mutable();
+            let decrypted = self.decrypt_messages(mutable, encryptor).await?;
+            PolledBatches::Mutable(decrypted)
         } else {
             batch
         };
@@ -344,9 +349,9 @@ impl IggyShard {
 
     async fn decrypt_messages(
         &self,
-        batches: IggyMessagesBatchSet,
+        batches: IggyMessagesBatchSetMut,
         encryptor: &EncryptorKind,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessagesBatchSetMut, IggyError> {
         let mut decrypted_batches = 
Vec::with_capacity(batches.containers_count());
         for batch in batches.iter() {
             let count = batch.count();
@@ -384,7 +389,7 @@ impl IggyShard {
             decrypted_batches.push(decrypted_batch);
         }
 
-        Ok(IggyMessagesBatchSet::from_vec(decrypted_batches))
+        Ok(IggyMessagesBatchSetMut::from_vec(decrypted_batches))
     }
 
     pub fn maybe_encrypt_messages(
diff --git a/core/server/src/shard/transmission/frame.rs 
b/core/server/src/shard/transmission/frame.rs
index 7c0fd2578..1ae2d4a51 100644
--- a/core/server/src/shard/transmission/frame.rs
+++ b/core/server/src/shard/transmission/frame.rs
@@ -21,15 +21,13 @@ use iggy_common::{IggyError, Stats};
 use crate::{
     binary::handlers::messages::poll_messages_handler::IggyPollMetadata,
     shard::transmission::message::ShardMessage,
-    streaming::{
-        segments::IggyMessagesBatchSet, streams::stream, topics::topic, 
users::user::User,
-    },
+    streaming::{segments::PolledBatches, streams::stream, topics::topic, 
users::user::User},
 };
 
 #[allow(clippy::large_enum_variant)]
 #[derive(Debug)]
 pub enum ShardResponse {
-    PollMessages((IggyPollMetadata, IggyMessagesBatchSet)),
+    PollMessages((IggyPollMetadata, PolledBatches)),
     SendMessages,
     FlushUnsavedBuffer,
     DeleteSegments,
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f1c342de2..66d0f7ca0 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -41,7 +41,8 @@ use crate::{
         },
         polling_consumer::PollingConsumer,
         segments::{
-            IggyMessagesBatchMut, IggyMessagesBatchSet, Segment, 
storage::create_segment_storage,
+            IggyMessagesBatchMut, IggyMessagesBatchSet, 
IggyMessagesBatchSetMut, PolledBatches,
+            Segment, storage::create_segment_storage,
         },
         streams::{
             self,
@@ -202,7 +203,7 @@ impl MainOps for Streams {
     type PollingArgs = PollingArgs;
     type Consumer = PollingConsumer;
     type In = IggyMessagesBatchMut;
-    type Out = (IggyPollMetadata, IggyMessagesBatchSet);
+    type Out = (IggyPollMetadata, PolledBatches);
     type Error = IggyError;
 
     async fn append_messages(
@@ -317,13 +318,11 @@ impl MainOps for Streams {
                 // We have to remember to keep the invariant from the if that 
is on line 290.
                 // Alternatively a better design would be to get rid of that 
if and move the validations here.
                 if offset > current_offset {
-                    return Ok((metadata, IggyMessagesBatchSet::default()));
+                    return Ok((metadata, PolledBatches::default()));
                 }
 
-                let batches = self
-                    .get_messages_by_offset(stream_id, topic_id, partition_id, 
offset, count)
-                    .await?;
-                Ok(batches)
+                self.get_messages_by_offset(stream_id, topic_id, partition_id, 
offset, count)
+                    .await?
             }
             PollingKind::Timestamp => {
                 let timestamp = IggyTimestamp::from(value);
@@ -343,7 +342,7 @@ impl MainOps for Streams {
                         count,
                     )
                     .await?;
-                Ok(batches)
+                PolledBatches::Mutable(batches)
             }
             PollingKind::First => {
                 let first_offset = self.with_partition_by_id(
@@ -358,10 +357,8 @@ impl MainOps for Streams {
                     },
                 );
 
-                let batches = self
-                    .get_messages_by_offset(stream_id, topic_id, partition_id, 
first_offset, count)
-                    .await?;
-                Ok(batches)
+                self.get_messages_by_offset(stream_id, topic_id, partition_id, 
first_offset, count)
+                    .await?
             }
             PollingKind::Last => {
                 let (start_offset, actual_count) = self.with_partition_by_id(
@@ -379,16 +376,14 @@ impl MainOps for Streams {
                     },
                 );
 
-                let batches = self
-                    .get_messages_by_offset(
-                        stream_id,
-                        topic_id,
-                        partition_id,
-                        start_offset,
-                        actual_count,
-                    )
-                    .await?;
-                Ok(batches)
+                self.get_messages_by_offset(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    start_offset,
+                    actual_count,
+                )
+                .await?
             }
             PollingKind::Next => {
                 let consumer_offset = match consumer {
@@ -414,10 +409,8 @@ impl MainOps for Streams {
 
                 match consumer_offset {
                     None => {
-                        let batches = self
-                            .get_messages_by_offset(stream_id, topic_id, 
partition_id, 0, count)
-                            .await?;
-                        Ok(batches)
+                        self.get_messages_by_offset(stream_id, topic_id, 
partition_id, 0, count)
+                            .await?
                     }
                     Some(consumer_offset) => {
                         let offset = consumer_offset + 1;
@@ -440,20 +433,18 @@ impl MainOps for Streams {
                                 );
                             }
                         }
-                        let batches = self
-                            .get_messages_by_offset(
-                                stream_id,
-                                topic_id,
-                                partition_id,
-                                offset,
-                                count,
-                            )
-                            .await?;
-                        Ok(batches)
+                        self.get_messages_by_offset(
+                            stream_id,
+                            topic_id,
+                            partition_id,
+                            offset,
+                            count,
+                        )
+                        .await?
                     }
                 }
             }
-        }?;
+        };
         Ok((metadata, batches))
     }
 }
@@ -649,9 +640,9 @@ impl Streams {
         partition_id: partitions::ContainerId,
         offset: u64,
         count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<PolledBatches, IggyError> {
         if count == 0 {
-            return Ok(IggyMessagesBatchSet::default());
+            return Ok(PolledBatches::default());
         }
 
         use crate::streaming::partitions::helpers;
@@ -663,7 +654,7 @@ impl Streams {
         );
 
         let mut remaining_count = count;
-        let mut batches = IggyMessagesBatchSet::empty();
+        let mut batches: Option<IggyMessagesBatchSetMut> = None;
         let mut current_offset = offset;
 
         for idx in range {
@@ -719,10 +710,22 @@ impl Streams {
                 current_offset += messages_count as u64;
             }
 
-            batches.add_batch_set(messages);
+            // If this is the only/first result and we're done, return 
directly (preserves Frozen)
+            if batches.is_none() && remaining_count == 0 {
+                return Ok(messages);
+            }
+
+            // Need to accumulate, convert to mutable if necessary
+            let mutable_set = messages.into_mutable();
+            match &mut batches {
+                Some(existing) => existing.add_batch_set(mutable_set),
+                None => batches = Some(mutable_set),
+            }
         }
 
-        Ok(batches)
+        Ok(PolledBatches::Mutable(
+            batches.unwrap_or_else(IggyMessagesBatchSetMut::empty),
+        ))
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -736,7 +739,7 @@ impl Streams {
         end_offset: u64,
         count: u32,
         segment_start_offset: u64,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<PolledBatches, IggyError> {
         let (
             is_journal_empty,
             journal_first_offset,
@@ -765,7 +768,6 @@ impl Streams {
         // Case 0: Journal is empty, check in-flight buffer or disk
         if is_journal_empty {
             if !in_flight_empty && offset >= in_flight_first && offset <= 
in_flight_last {
-                let mut result = IggyMessagesBatchSet::empty();
                 let in_flight_batches = self.with_partition_by_id(
                     stream_id,
                     topic_id,
@@ -773,13 +775,13 @@ impl Streams {
                     |(_, _, _, _, _, _, log)| 
log.in_flight().get_by_offset(offset, count).to_vec(),
                 );
                 if !in_flight_batches.is_empty() {
-                    result.add_immutable_batches(&in_flight_batches);
-                    let final_result = result.get_by_offset(offset, count);
-                    return Ok(final_result);
+                    let batches =
+                        
PolledBatches::Frozen(IggyMessagesBatchSet::from_vec(in_flight_batches));
+                    return Ok(batches.filter_by_offset(offset, count));
                 }
             }
 
-            return self
+            let disk_batches = self
                 .load_messages_from_disk_by_offset(
                     stream_id,
                     topic_id,
@@ -789,7 +791,8 @@ impl Streams {
                     count,
                     segment_start_offset,
                 )
-                .await;
+                .await?;
+            return Ok(PolledBatches::Mutable(disk_batches));
         }
 
         // Case 1: All messages are in accumulator buffer
@@ -803,12 +806,12 @@ impl Streams {
                         .get(|batches| batches.get_by_offset(offset, count))
                 },
             );
-            return Ok(batches);
+            return Ok(PolledBatches::Mutable(batches));
         }
 
         // Case 2: All messages are on disk
         if end_offset < journal_first_offset {
-            return self
+            let disk_batches = self
                 .load_messages_from_disk_by_offset(
                     stream_id,
                     topic_id,
@@ -818,7 +821,8 @@ impl Streams {
                     count,
                     segment_start_offset,
                 )
-                .await;
+                .await?;
+            return Ok(PolledBatches::Mutable(disk_batches));
         }
 
         // Case 3: Messages span disk and accumulator buffer boundary
@@ -828,7 +832,7 @@ impl Streams {
         } else {
             0
         };
-        let mut combined_batch_set = IggyMessagesBatchSet::empty();
+        let mut combined_batch_set = IggyMessagesBatchSetMut::empty();
 
         // Load messages from disk if needed
         if disk_count > 0 {
@@ -873,7 +877,7 @@ impl Streams {
             }
         }
 
-        Ok(combined_batch_set)
+        Ok(PolledBatches::Mutable(combined_batch_set))
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -886,7 +890,7 @@ impl Streams {
         start_offset: u64,
         count: u32,
         segment_start_offset: u64,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessagesBatchSetMut, IggyError> {
         let relative_start_offset = (start_offset - segment_start_offset) as 
u32;
 
         let (index_reader, messages_reader, indexes) = 
self.with_partition_by_id(
@@ -930,7 +934,7 @@ impl Streams {
         };
 
         if indexes_to_read.is_none() {
-            return Ok(IggyMessagesBatchSet::empty());
+            return Ok(IggyMessagesBatchSetMut::empty());
         }
 
         let indexes_to_read = indexes_to_read.unwrap();
@@ -946,7 +950,7 @@ impl Streams {
                 format!("Failed to validate messages read from disk! error: 
{e}")
             })?;
 
-        Ok(IggyMessagesBatchSet::from(batch))
+        Ok(IggyMessagesBatchSetMut::from(batch))
     }
 
     pub async fn get_messages_by_timestamp(
@@ -956,7 +960,7 @@ impl Streams {
         partition_id: partitions::ContainerId,
         timestamp: u64,
         count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessagesBatchSetMut, IggyError> {
         use crate::streaming::partitions::helpers;
         let Ok(range) = self.with_partition_by_id(
             stream_id,
@@ -964,11 +968,11 @@ impl Streams {
             partition_id,
             helpers::get_segment_range_by_timestamp(timestamp),
         ) else {
-            return Ok(IggyMessagesBatchSet::default());
+            return Ok(IggyMessagesBatchSetMut::default());
         };
 
         let mut remaining_count = count;
-        let mut batches = IggyMessagesBatchSet::empty();
+        let mut batches = IggyMessagesBatchSetMut::empty();
 
         for idx in range {
             if remaining_count == 0 {
@@ -1020,9 +1024,9 @@ impl Streams {
         idx: usize,
         timestamp: u64,
         count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessagesBatchSetMut, IggyError> {
         if count == 0 {
-            return Ok(IggyMessagesBatchSet::default());
+            return Ok(IggyMessagesBatchSetMut::default());
         }
 
         let (is_journal_empty, journal_first_timestamp, 
journal_last_timestamp) = self
@@ -1056,7 +1060,7 @@ impl Streams {
 
         // Case 1: All messages are in accumulator buffer (timestamp is after 
journal ends)
         if timestamp > journal_last_timestamp {
-            return Ok(IggyMessagesBatchSet::empty());
+            return Ok(IggyMessagesBatchSetMut::empty());
         }
 
         // Case 1b: Timestamp is within journal range
@@ -1116,7 +1120,7 @@ impl Streams {
         idx: usize,
         timestamp: u64,
         count: u32,
-    ) -> Result<IggyMessagesBatchSet, IggyError> {
+    ) -> Result<IggyMessagesBatchSetMut, IggyError> {
         let (index_reader, messages_reader, indexes) = 
self.with_partition_by_id(
             stream_id,
             topic_id,
@@ -1158,7 +1162,7 @@ impl Streams {
         };
 
         if indexes_to_read.is_none() {
-            return Ok(IggyMessagesBatchSet::empty());
+            return Ok(IggyMessagesBatchSetMut::empty());
         }
 
         let indexes_to_read = indexes_to_read.unwrap();
@@ -1171,7 +1175,7 @@ impl Streams {
                 format!("Failed to load messages from disk by timestamp: {e}")
             })?;
 
-        Ok(IggyMessagesBatchSet::from(batch))
+        Ok(IggyMessagesBatchSetMut::from(batch))
     }
 
     pub async fn handle_full_segment(
@@ -1322,7 +1326,7 @@ impl Streams {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        mut batches: IggyMessagesBatchSet,
+        mut batches: IggyMessagesBatchSetMut,
         config: &SystemConfig,
     ) -> Result<u32, IggyError> {
         let batch_count = batches.count();
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 531e77e54..c00465b39 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -32,7 +32,9 @@ use crate::{
             storage,
         },
         polling_consumer::ConsumerGroupId,
-        segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
+        segments::{
+            IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSetMut, 
storage::Storage,
+        },
     },
 };
 use err_trail::ErrContext;
@@ -334,7 +336,7 @@ pub async fn load_messages_from_disk_by_timestamp(
     index: &Option<IggyIndexesMut>,
     timestamp: u64,
     count: u32,
-) -> Result<IggyMessagesBatchSet, IggyError> {
+) -> Result<IggyMessagesBatchSetMut, IggyError> {
     let indexes_to_read = if let Some(indexes) = index {
         if !indexes.is_empty() {
             indexes.slice_by_timestamp(timestamp, count)
@@ -356,7 +358,7 @@ pub async fn load_messages_from_disk_by_timestamp(
     };
 
     if indexes_to_read.is_none() {
-        return Ok(IggyMessagesBatchSet::empty());
+        return Ok(IggyMessagesBatchSetMut::empty());
     }
 
     let indexes_to_read = indexes_to_read.unwrap();
@@ -369,7 +371,7 @@ pub async fn load_messages_from_disk_by_timestamp(
         .await
         .error(|e: &IggyError| format!("Failed to load messages from disk by 
timestamp: {e}"))?;
 
-    Ok(IggyMessagesBatchSet::from(batch))
+    Ok(IggyMessagesBatchSetMut::from(batch))
 }
 
 pub fn calculate_current_offset() -> impl FnOnce(ComponentsById<PartitionRef>) 
-> u64 {
@@ -430,7 +432,7 @@ pub fn append_to_journal(
     }
 }
 
-pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSet {
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSetMut {
     |(.., log)| {
         let batches = log.journal_mut().commit();
         log.ensure_indexes();
@@ -475,7 +477,7 @@ pub fn persist_batch(
     stream_id: &Identifier,
     topic_id: &Identifier,
     partition_id: usize,
-    batches: IggyMessagesBatchSet,
+    batches: IggyMessagesBatchSetMut,
     reason: String,
 ) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize, 
u32), IggyError> {
     async move |(.., log)| {
diff --git a/core/server/src/streaming/partitions/journal.rs 
b/core/server/src/streaming/partitions/journal.rs
index d16db4f6d..a405ae5a7 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::streaming::segments::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use crate::streaming::segments::{IggyMessagesBatchMut, 
IggyMessagesBatchSetMut};
 use iggy_common::{IggyByteSize, IggyError};
 use std::fmt::Debug;
 
@@ -34,7 +34,7 @@ pub struct Inner {
 
 #[derive(Default, Debug)]
 pub struct MemoryMessageJournal {
-    batches: IggyMessagesBatchSet,
+    batches: IggyMessagesBatchSetMut,
     inner: Inner,
 }
 
@@ -48,7 +48,7 @@ impl Clone for MemoryMessageJournal {
 }
 
 impl Journal for MemoryMessageJournal {
-    type Container = IggyMessagesBatchSet;
+    type Container = IggyMessagesBatchSetMut;
     type Entry = IggyMessagesBatchMut;
     type Inner = Inner;
     type AppendResult = Result<(u32, u32), IggyError>;
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 85d3bd215..0ef5e4023 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -17,7 +17,7 @@
  */
 
 use crate::streaming::segments::{
-    IggyMessagesBatchSet,
+    IggyMessagesBatchSetMut,
     messages::{write_batch, write_batch_frozen},
 };
 use compio::fs::{File, OpenOptions};
@@ -99,7 +99,7 @@ impl MessagesWriter {
     /// Append a batch of messages to the messages file.
     pub async fn save_batch_set(
         &self,
-        batch_set: IggyMessagesBatchSet,
+        batch_set: IggyMessagesBatchSetMut,
     ) -> Result<IggyByteSize, IggyError> {
         let messages_size = batch_set.size();
         let messages_count = batch_set.count();
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index c085ed882..fdb5e1dae 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,7 +19,7 @@
 mod messages_reader;
 mod messages_writer;
 
-use super::IggyMessagesBatchSet;
+use super::IggyMessagesBatchSetMut;
 use bytes::Bytes;
 use compio::{fs::File, io::AsyncWriteAtExt};
 use iggy_common::{IggyError, IggyMessagesBatch};
@@ -31,7 +31,7 @@ pub use messages_writer::MessagesWriter;
 async fn write_batch(
     file: &File,
     position: u64,
-    mut batches: IggyMessagesBatchSet,
+    mut batches: IggyMessagesBatchSetMut,
 ) -> Result<usize, IggyError> {
     let total_written = batches.iter().map(|b| b.size() as usize).sum();
     let batches = batches
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index 7ed09c2d4..0ffdf623f 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -29,5 +29,7 @@ pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;
 pub use types::IggyMessagesBatchMut;
 pub use types::IggyMessagesBatchSet;
+pub use types::IggyMessagesBatchSetMut;
+pub use types::PolledBatches;
 
 pub const SEGMENT_MAX_SIZE_BYTES: u64 = 1024 * 1024 * 1024;
diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs 
b/core/server/src/streaming/segments/types/messages_batch_set.rs
index 0856bd217..9cecf1d8f 100644
--- a/core/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_set.rs
@@ -28,19 +28,22 @@ use tracing::trace;
 
 use super::IggyMessagesBatchMut;
 
-/// A container for multiple IggyMessagesBatch objects
-#[derive(Debug, Default)]
+// ============================================================================
+// IggyMessagesBatchSet - Immutable/frozen batches (Arc-backed Bytes, 
zero-copy)
+// ============================================================================
+
+/// A container for immutable message batches (frozen, Arc-backed).
+///
+/// Used for reads from in-flight buffer during async disk I/O.
+/// All slicing operations use `Bytes::slice()` for zero-copy.
+#[derive(Debug, Default, Clone)]
 pub struct IggyMessagesBatchSet {
-    /// The collection of message containers
-    batches: Vec<IggyMessagesBatchMut>,
-    /// Total number of messages across all containers
+    batches: Vec<IggyMessagesBatch>,
     count: u32,
-    /// Total size in bytes across all containers
     size: u32,
 }
 
 impl IggyMessagesBatchSet {
-    /// Create a new empty batch
     pub fn empty() -> Self {
         Self {
             batches: Vec::new(),
@@ -49,7 +52,6 @@ impl IggyMessagesBatchSet {
         }
     }
 
-    /// Create a new empty batch set with a specified initial capacity of 
message containers
     pub fn with_capacity(capacity: usize) -> Self {
         Self {
             batches: Vec::with_capacity(capacity),
@@ -58,7 +60,196 @@ impl IggyMessagesBatchSet {
         }
     }
 
-    /// Create a batch set from an existing vector of IggyMessages
+    pub fn from_vec(batches: Vec<IggyMessagesBatch>) -> Self {
+        let count = batches.iter().map(|b| b.count()).sum();
+        let size = batches.iter().map(|b| b.size()).sum();
+        Self {
+            batches,
+            count,
+            size,
+        }
+    }
+
+    pub fn add_batch(&mut self, batch: IggyMessagesBatch) {
+        self.count += batch.count();
+        self.size += batch.size();
+        self.batches.push(batch);
+    }
+
+    pub fn count(&self) -> u32 {
+        self.count
+    }
+
+    pub fn size(&self) -> u32 {
+        self.size
+    }
+
+    pub fn containers_count(&self) -> usize {
+        self.batches.len()
+    }
+
+    pub fn is_empty(&self) -> bool {
+        self.batches.is_empty() || self.count == 0
+    }
+
+    pub fn first_offset(&self) -> Option<u64> {
+        self.batches.first().and_then(|b| b.first_offset())
+    }
+
+    pub fn last_offset(&self) -> Option<u64> {
+        self.batches.last().and_then(|b| b.last_offset())
+    }
+
+    pub fn first_timestamp(&self) -> Option<u64> {
+        self.batches.first().and_then(|b| b.first_timestamp())
+    }
+
+    pub fn last_timestamp(&self) -> Option<u64> {
+        self.batches.last().and_then(|b| b.last_timestamp())
+    }
+
+    pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatch> {
+        self.batches.iter()
+    }
+
+    pub fn into_inner(self) -> Vec<IggyMessagesBatch> {
+        self.batches
+    }
+
+    /// Zero-copy filtering by offset using `Bytes::slice()`.
+    pub fn get_by_offset(&self, offset: u64, count: u32) -> Self {
+        if self.is_empty() || count == 0 {
+            return Self::empty();
+        }
+
+        let end_offset = offset + count as u64 - 1;
+        let mut result = Self::with_capacity(self.batches.len());
+        let mut remaining = count;
+
+        for batch in &self.batches {
+            if remaining == 0 {
+                break;
+            }
+
+            let batch_first = match batch.first_offset() {
+                Some(o) => o,
+                None => continue,
+            };
+            let batch_last = match batch.last_offset() {
+                Some(o) => o,
+                None => continue,
+            };
+
+            // Skip batches entirely before requested range
+            if batch_last < offset {
+                continue;
+            }
+
+            // Stop if batch is entirely after requested range
+            if batch_first > end_offset {
+                break;
+            }
+
+            // Batch overlaps - slice it (zero-copy via Bytes::slice)
+            if let Some(sliced) = batch.slice_by_offset(offset, remaining) {
+                remaining = remaining.saturating_sub(sliced.count());
+                result.add_batch(sliced);
+            }
+        }
+
+        result
+    }
+
+    /// Convert to mutable batch set (copies data).
+    pub fn into_mutable(self) -> IggyMessagesBatchSetMut {
+        let mut result = 
IggyMessagesBatchSetMut::with_capacity(self.batches.len());
+        for batch in self.batches {
+            let mutable = frozen_to_mutable(&batch);
+            result.add_batch(mutable);
+        }
+        result
+    }
+
+    pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> 
PolledMessages {
+        if self.is_empty() {
+            return PolledMessages::empty();
+        }
+
+        let mut messages = Vec::with_capacity(self.count as usize);
+
+        for batch in &self.batches {
+            for message in batch.iter() {
+                let header = message.header().to_header();
+                let payload = Bytes::copy_from_slice(message.payload());
+                let user_headers = 
message.user_headers().map(Bytes::copy_from_slice);
+                let msg = IggyMessage {
+                    header,
+                    payload,
+                    user_headers,
+                };
+                messages.push(msg);
+            }
+        }
+
+        trace!(
+            "Converted frozen batch of {} messages from partition {} with 
current offset {}",
+            messages.len(),
+            poll_metadata.partition_id,
+            poll_metadata.current_offset
+        );
+
+        PolledMessages {
+            partition_id: poll_metadata.partition_id,
+            current_offset: poll_metadata.current_offset,
+            count: messages.len() as u32,
+            messages,
+        }
+    }
+}
+
+impl From<Vec<IggyMessagesBatch>> for IggyMessagesBatchSet {
+    fn from(batches: Vec<IggyMessagesBatch>) -> Self {
+        Self::from_vec(batches)
+    }
+}
+
+impl Sizeable for IggyMessagesBatchSet {
+    fn get_size_bytes(&self) -> IggyByteSize {
+        IggyByteSize::from(self.size as u64)
+    }
+}
+
+// ============================================================================
+// IggyMessagesBatchSetMut - Mutable batches (PooledBuffer)
+// ============================================================================
+
+/// A container for mutable message batches (using PooledBuffer).
+///
+/// Used for reads from journal and disk.
+#[derive(Debug, Default)]
+pub struct IggyMessagesBatchSetMut {
+    batches: Vec<IggyMessagesBatchMut>,
+    count: u32,
+    size: u32,
+}
+
+impl IggyMessagesBatchSetMut {
+    pub fn empty() -> Self {
+        Self {
+            batches: Vec::new(),
+            count: 0,
+            size: 0,
+        }
+    }
+
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            batches: Vec::with_capacity(capacity),
+            count: 0,
+            size: 0,
+        }
+    }
+
     pub fn from_vec(messages: Vec<IggyMessagesBatchMut>) -> Self {
         let mut batch = Self::with_capacity(messages.len());
         for msg in messages {
@@ -67,46 +258,19 @@ impl IggyMessagesBatchSet {
         batch
     }
 
-    /// Add another batch of messages to the batch set
     pub fn add_batch(&mut self, batch: IggyMessagesBatchMut) {
         self.count += batch.count();
         self.size += batch.size();
         self.batches.push(batch);
     }
 
-    /// Add another batch set of messages to the batch set
-    pub fn add_batch_set(&mut self, mut other_batch_set: IggyMessagesBatchSet) 
{
-        self.count += other_batch_set.count();
-        self.size += other_batch_set.size();
-        let other_batches = std::mem::take(&mut other_batch_set.batches);
+    pub fn add_batch_set(&mut self, mut other: IggyMessagesBatchSetMut) {
+        self.count += other.count();
+        self.size += other.size();
+        let other_batches = std::mem::take(&mut other.batches);
         self.batches.extend(other_batches);
     }
 
-    /// Add immutable batches by copying them into mutable form.
-    ///
-    /// This is used when reading from the in-flight buffer (which holds
-    /// frozen/immutable batches) and needs to convert them for the read path.
-    pub fn add_immutable_batches(&mut self, batches: &[IggyMessagesBatch]) {
-        for batch in batches {
-            let mutable_batch = Self::immutable_to_mutable(batch);
-            self.add_batch(mutable_batch);
-        }
-    }
-
-    /// Convert an immutable IggyMessagesBatch to a mutable 
IggyMessagesBatchMut.
-    ///
-    /// This requires copying the Bytes into a PooledBuffer.
-    fn immutable_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut 
{
-        let count = batch.count();
-        let base_position = batch.indexes().base_position();
-        let indexes_buffer = PooledBuffer::from(batch.indexes_slice());
-        let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 
base_position);
-        let messages = PooledBuffer::from(batch.buffer());
-
-        IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, 
messages)
-    }
-
-    /// Extract indexes from all batches in the set
     pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) {
         for batch in self.iter() {
             let indexes = batch.indexes();
@@ -114,87 +278,65 @@ impl IggyMessagesBatchSet {
         }
     }
 
-    /// Get the total number of messages in the batch
     pub fn count(&self) -> u32 {
         self.count
     }
 
-    /// Get the total size of all messages in bytes
     pub fn size(&self) -> u32 {
         self.size
     }
 
-    /// Get the number of message containers in the batch
     pub fn containers_count(&self) -> usize {
         self.batches.len()
     }
 
-    /// Check if the batch is empty
     pub fn is_empty(&self) -> bool {
         self.batches.is_empty() || self.count == 0
     }
 
-    /// Get timestamp of first message in first batch
     pub fn first_timestamp(&self) -> Option<u64> {
         if self.is_empty() {
             return None;
         }
-        self.batches.first().map(|batch| batch.first_timestamp())?
+        self.batches
+            .first()
+            .and_then(|batch| batch.first_timestamp())
     }
 
-    /// Get offset of first message in first batch
     pub fn first_offset(&self) -> Option<u64> {
         if self.is_empty() {
             return None;
         }
-        self.batches.first().map(|batch| batch.first_offset())?
+        self.batches.first().and_then(|batch| batch.first_offset())
     }
 
-    /// Get timestamp of last message in last batch
     pub fn last_timestamp(&self) -> Option<u64> {
         if self.is_empty() {
             return None;
         }
-        self.batches.last().map(|batch| batch.last_timestamp())?
+        self.batches.last().and_then(|batch| batch.last_timestamp())
     }
 
-    /// Get offset of last message in last batch
     pub fn last_offset(&self) -> Option<u64> {
-        self.batches.last().map(|batch| batch.last_offset())?
+        self.batches.last().and_then(|batch| batch.last_offset())
     }
 
-    /// Get a reference to the underlying vector of message containers
     pub fn inner(&self) -> &Vec<IggyMessagesBatchMut> {
         &self.batches
     }
 
-    /// Consume the batch, returning the underlying vector of message 
containers
     pub fn into_inner(mut self) -> Vec<IggyMessagesBatchMut> {
         std::mem::take(&mut self.batches)
     }
 
-    /// Iterate over all message containers in the batch
     pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatchMut> {
         self.batches.iter()
     }
 
-    /// Iterate over all mutable message containers in the batch
     pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut 
IggyMessagesBatchMut> {
         self.batches.iter_mut()
     }
 
-    /// Convert this batch and poll metadata into a vector of fully-formed 
IggyMessage objects
-    ///
-    /// This method transforms the internal message views into complete 
IggyMessage objects
-    /// that can be returned to clients. It should only be used by server http 
implementation.
-    ///
-    /// # Arguments
-    ///
-    /// * `poll_metadata` - Metadata about the partition and current offset
-    ///
-    /// # Returns
-    ///
-    /// A vector of IggyMessage objects with proper metadata
     pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> 
PolledMessages {
         if self.is_empty() {
             return PolledMessages::empty();
@@ -231,10 +373,6 @@ impl IggyMessagesBatchSet {
         }
     }
 
-    /// Returns a new IggyMessagesBatch containing only messages with offsets 
greater than or equal to the specified offset,
-    /// up to the specified count.
-    ///
-    /// If no messages match the criteria, returns an empty batch.
     pub fn get_by_offset(&self, start_offset: u64, count: u32) -> Self {
         if self.is_empty() || count == 0 {
             return Self::empty();
@@ -266,10 +404,6 @@ impl IggyMessagesBatchSet {
         result
     }
 
-    /// Returns a new IggyMessagesBatch containing only messages with 
timestamps greater than or equal
-    /// to the specified timestamp, up to the specified count.
-    ///
-    /// If no messages match the criteria, returns an empty batch.
     pub fn get_by_timestamp(&self, timestamp: u64, count: u32) -> Self {
         if self.is_empty() || count == 0 {
             return Self::empty();
@@ -299,8 +433,6 @@ impl IggyMessagesBatchSet {
         result
     }
 
-    /// Get the message at the specified index.
-    /// Returns None if the index is out of bounds.
     pub fn get(&self, index: usize) -> Option<IggyMessageView<'_>> {
         if index >= self.count as usize {
             return None;
@@ -323,14 +455,9 @@ impl IggyMessagesBatchSet {
     }
 }
 
-impl Index<usize> for IggyMessagesBatchSet {
+impl Index<usize> for IggyMessagesBatchSetMut {
     type Output = [u8];
 
-    /// Get the message bytes at the specified index across all batches
-    ///
-    /// # Panics
-    ///
-    /// Panics if the index is out of bounds (>= total number of messages)
     fn index(&self, index: usize) -> &Self::Output {
         if index >= self.count as usize {
             panic!(
@@ -356,20 +483,126 @@ impl Index<usize> for IggyMessagesBatchSet {
     }
 }
 
-impl Sizeable for IggyMessagesBatchSet {
+impl Sizeable for IggyMessagesBatchSetMut {
     fn get_size_bytes(&self) -> IggyByteSize {
         IggyByteSize::from(self.size as u64)
     }
 }
 
-impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSet {
+impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSetMut {
     fn from(messages: Vec<IggyMessagesBatchMut>) -> Self {
         Self::from_vec(messages)
     }
 }
 
-impl From<IggyMessagesBatchMut> for IggyMessagesBatchSet {
+impl From<IggyMessagesBatchMut> for IggyMessagesBatchSetMut {
     fn from(messages: IggyMessagesBatchMut) -> Self {
         Self::from_vec(vec![messages])
     }
 }
+
+// ============================================================================
+// PolledBatches - Result of polling (either mutable or frozen)
+// ============================================================================
+
+/// Result of polling messages - either mutable or frozen batches.
+///
+/// Using an enum avoids copying when reading from the in-flight buffer,
+/// as the frozen `Bytes` are Arc-backed and use zero-copy slicing.
+#[derive(Debug)]
+pub enum PolledBatches {
+    /// Mutable batches from journal or disk
+    Mutable(IggyMessagesBatchSetMut),
+    /// Frozen batches from in-flight buffer (Arc-backed, zero-copy)
+    Frozen(IggyMessagesBatchSet),
+}
+
+impl PolledBatches {
+    pub fn empty() -> Self {
+        Self::Mutable(IggyMessagesBatchSetMut::empty())
+    }
+
+    pub fn count(&self) -> u32 {
+        match self {
+            Self::Mutable(set) => set.count(),
+            Self::Frozen(set) => set.count(),
+        }
+    }
+
+    pub fn size(&self) -> u32 {
+        match self {
+            Self::Mutable(set) => set.size(),
+            Self::Frozen(set) => set.size(),
+        }
+    }
+
+    pub fn is_empty(&self) -> bool {
+        match self {
+            Self::Mutable(set) => set.is_empty(),
+            Self::Frozen(set) => set.is_empty(),
+        }
+    }
+
+    pub fn containers_count(&self) -> usize {
+        match self {
+            Self::Mutable(set) => set.containers_count(),
+            Self::Frozen(set) => set.containers_count(),
+        }
+    }
+
+    pub fn last_offset(&self) -> Option<u64> {
+        match self {
+            Self::Mutable(set) => set.last_offset(),
+            Self::Frozen(set) => set.last_offset(),
+        }
+    }
+
+    pub fn first_offset(&self) -> Option<u64> {
+        match self {
+            Self::Mutable(set) => set.first_offset(),
+            Self::Frozen(set) => set.first_offset(),
+        }
+    }
+
+    /// Filter batches by offset. Zero-copy for frozen batches.
+    pub fn filter_by_offset(self, offset: u64, count: u32) -> Self {
+        match self {
+            Self::Mutable(set) => Self::Mutable(set.get_by_offset(offset, 
count)),
+            Self::Frozen(set) => Self::Frozen(set.get_by_offset(offset, 
count)),
+        }
+    }
+
+    /// Convert to mutable batch set (copies frozen data).
+    pub fn into_mutable(self) -> IggyMessagesBatchSetMut {
+        match self {
+            Self::Mutable(set) => set,
+            Self::Frozen(set) => set.into_mutable(),
+        }
+    }
+
+    pub fn into_polled_messages(&self, poll_metadata: IggyPollMetadata) -> 
PolledMessages {
+        match self {
+            Self::Mutable(set) => set.into_polled_messages(poll_metadata),
+            Self::Frozen(set) => set.into_polled_messages(poll_metadata),
+        }
+    }
+}
+
+impl Default for PolledBatches {
+    fn default() -> Self {
+        Self::empty()
+    }
+}
+
+// ============================================================================
+// Helper functions
+// ============================================================================
+
+fn frozen_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut {
+    let count = batch.count();
+    let base_position = batch.indexes().base_position();
+    let indexes_buffer = PooledBuffer::from(batch.indexes_slice());
+    let indexes = IggyIndexesMut::from_bytes(indexes_buffer, base_position);
+    let messages = PooledBuffer::from(batch.buffer());
+    IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, messages)
+}
diff --git a/core/server/src/streaming/segments/types/mod.rs 
b/core/server/src/streaming/segments/types/mod.rs
index 9f5266eed..5d83b37b8 100644
--- a/core/server/src/streaming/segments/types/mod.rs
+++ b/core/server/src/streaming/segments/types/mod.rs
@@ -25,3 +25,5 @@ pub use message_header_view_mut::IggyMessageHeaderViewMut;
 pub use message_view_mut::IggyMessageViewMut;
 pub use messages_batch_mut::IggyMessagesBatchMut;
 pub use messages_batch_set::IggyMessagesBatchSet;
+pub use messages_batch_set::IggyMessagesBatchSetMut;
+pub use messages_batch_set::PolledBatches;

Reply via email to