This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching-rebase-3
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit b8bdbc98ca4f134470ab1571014b1083f00c96c0
Author: Hubert Gruszecki <h.grusze...@gmail.com>
AuthorDate: Sat Apr 12 17:38:04 2025 +0200

    implement BytesMutPool
---
 Cargo.lock                                         |  24 ++
 sdk/src/http/messages.rs                           |   1 +
 sdk/src/identifier.rs                              |   5 +
 sdk/src/messages/partitioning.rs                   |   5 +
 sdk/src/messages/send_messages.rs                  |  38 ++-
 sdk/src/models/messaging/messages_batch.rs         | 163 +---------
 server/Cargo.toml                                  |   2 +
 server/src/binary/command.rs                       |   2 -
 .../handlers/messages/send_messages_handler.rs     |  72 +++--
 server/src/channels/commands/print_sysinfo.rs      |   7 +-
 server/src/http/jwt/storage.rs                     |   7 +-
 server/src/http/messages.rs                        |   6 +-
 server/src/main.rs                                 |   3 +
 .../src/streaming/segments/indexes/index_reader.rs |  24 +-
 .../src/streaming/segments/indexes/indexes_mut.rs  |  52 +++-
 .../streaming/segments/messages/messages_reader.rs |  39 +--
 .../src/streaming/segments/messages_accumulator.rs |  11 +-
 server/src/streaming/segments/reading_messages.rs  |  11 +-
 server/src/streaming/segments/segment.rs           |   2 +-
 .../streaming/segments/types/messages_batch_mut.rs | 336 +++++++++++++++++----
 .../streaming/segments/types/messages_batch_set.rs |  84 +++---
 server/src/streaming/segments/writing_messages.rs  |  12 +-
 server/src/streaming/systems/messages.rs           |  14 +-
 server/src/streaming/systems/storage.rs            |  10 +-
 server/src/streaming/utils/bytes_mut_pool.rs       | 313 +++++++++++++++++++
 server/src/streaming/utils/mod.rs                  |   1 +
 26 files changed, 869 insertions(+), 375 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index fa08280f..432e33c7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1231,6 +1231,19 @@ dependencies = [
  "cfg-if",
 ]
 
+[[package]]
+name = "crossbeam"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
+dependencies = [
+ "crossbeam-channel",
+ "crossbeam-deque",
+ "crossbeam-epoch",
+ "crossbeam-queue",
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-channel"
 version = "0.5.14"
@@ -1259,6 +1272,15 @@ dependencies = [
  "crossbeam-utils",
 ]
 
+[[package]]
+name = "crossbeam-queue"
+version = "0.3.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115"
+dependencies = [
+ "crossbeam-utils",
+]
+
 [[package]]
 name = "crossbeam-utils"
 version = "0.8.21"
@@ -4870,6 +4892,7 @@ dependencies = [
  "chrono",
  "clap",
  "console-subscriber",
+ "crossbeam",
  "dashmap",
  "derive_more",
  "dotenvy",
@@ -4887,6 +4910,7 @@ dependencies = [
  "mockall",
  "moka",
  "nix",
+ "once_cell",
  "openssl",
  "opentelemetry",
  "opentelemetry-appender-tracing",
diff --git a/sdk/src/http/messages.rs b/sdk/src/http/messages.rs
index 22fd3ede..4de7bec7 100644
--- a/sdk/src/http/messages.rs
+++ b/sdk/src/http/messages.rs
@@ -71,6 +71,7 @@ impl MessageClient for HttpClient {
         self.post(
             &get_path(&stream_id.as_cow_str(), &topic_id.as_cow_str()),
             &SendMessages {
+                metadata_length: 0, // this field is used only for TCP/QUIC
                 stream_id: stream_id.clone(),
                 topic_id: topic_id.clone(),
                 partitioning: partitioning.clone(),
diff --git a/sdk/src/identifier.rs b/sdk/src/identifier.rs
index e66d0059..cd65e318 100644
--- a/sdk/src/identifier.rs
+++ b/sdk/src/identifier.rs
@@ -200,6 +200,11 @@ impl Identifier {
         identifier.validate()?;
         Ok(identifier)
     }
+
+    /// Maximum size of the Identifier struct
+    pub const fn maximum_byte_size() -> usize {
+        2 + 255
+    }
 }
 
 impl Sizeable for Identifier {
diff --git a/sdk/src/messages/partitioning.rs b/sdk/src/messages/partitioning.rs
index 8e454b38..f41b8814 100644
--- a/sdk/src/messages/partitioning.rs
+++ b/sdk/src/messages/partitioning.rs
@@ -155,6 +155,11 @@ impl Partitioning {
             value,
         })
     }
+
+    /// Maximum size of the Partitioning struct
+    pub const fn maximum_byte_size() -> usize {
+        2 + 255
+    }
 }
 
 impl Hash for Partitioning {
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 22a219b9..e2952c6e 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -43,6 +43,8 @@ use super::{Partitioning, PartitioningKind};
 /// - `batch` - collection of messages to be sent.
 #[derive(Debug, PartialEq)]
 pub struct SendMessages {
+    /// Length of stream_id, topic_id, partitioning and messages_count (4 
bytes)
+    pub metadata_length: u32,
     /// Unique stream ID (numeric or name).
     pub stream_id: Identifier,
     /// Unique topic ID (numeric or name).
@@ -60,25 +62,33 @@ impl SendMessages {
         partitioning: &Partitioning,
         messages: &[IggyMessage],
     ) -> Bytes {
-        let stream_id_size = stream_id.get_buffer_size();
-        let topic_id_size = topic_id.get_buffer_size();
-        let partitioning_size = partitioning.get_buffer_size();
+        let stream_id_field_size = stream_id.get_buffer_size();
+        let topic_id_field_size = topic_id.get_buffer_size();
+        let partitioning_field_size = partitioning.get_buffer_size();
+        let metadata_length_field_size = std::mem::size_of::<u32>();
         let messages_count = messages.len();
-        let messages_count_size = std::mem::size_of::<u32>();
+        let messages_count_field_size = std::mem::size_of::<u32>();
+        let metadata_length = stream_id_field_size
+            + topic_id_field_size
+            + partitioning_field_size
+            + messages_count_field_size;
         let indexes_size = messages_count * INDEX_SIZE;
-
-        let total_size = stream_id_size
-            + topic_id_size
-            + partitioning_size
-            + messages_count_size
+        let messages_size = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_usize())
+            .sum::<usize>();
+
+        let total_size = metadata_length_field_size
+            + stream_id_field_size
+            + topic_id_field_size
+            + partitioning_field_size
+            + messages_count_field_size
             + indexes_size
-            + messages
-                .iter()
-                .map(|m| m.get_size_bytes().as_bytes_usize())
-                .sum::<usize>();
+            + messages_size;
 
         let mut bytes = BytesMut::with_capacity(total_size);
 
+        bytes.put_u32_le(metadata_length as u32);
         stream_id.write_to_buffer(&mut bytes);
         topic_id.write_to_buffer(&mut bytes);
         partitioning.write_to_buffer(&mut bytes);
@@ -121,6 +131,7 @@ fn write_value_at<const N: usize>(slice: &mut [u8], value: 
[u8; N], position: us
 impl Default for SendMessages {
     fn default() -> Self {
         SendMessages {
+            metadata_length: 0,
             stream_id: Identifier::default(),
             topic_id: Identifier::default(),
             partitioning: Partitioning::default(),
@@ -327,6 +338,7 @@ impl<'de> Deserialize<'de> for SendMessages {
                 let batch = IggyMessagesBatch::from(&messages);
 
                 Ok(SendMessages {
+                    metadata_length: 0, // this field is used only for TCP/QUIC
                     stream_id: Identifier::default(),
                     topic_id: Identifier::default(),
                     partitioning,
diff --git a/sdk/src/models/messaging/messages_batch.rs 
b/sdk/src/models/messaging/messages_batch.rs
index 31851176..911ff723 100644
--- a/sdk/src/models/messaging/messages_batch.rs
+++ b/sdk/src/models/messaging/messages_batch.rs
@@ -156,116 +156,6 @@ impl IggyMessagesBatch {
         }
     }
 
-    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
-    /// whose message headers have an offset greater than or equal to the 
provided `start_offset`.
-    pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> 
Option<Self> {
-        if self.is_empty() || count == 0 {
-            return None;
-        }
-
-        let first_offset = self.first_offset()?;
-
-        if start_offset < first_offset {
-            return self.slice_by_index(0, count);
-        }
-
-        let last_offset = self.last_offset()?;
-        if start_offset > last_offset {
-            return None;
-        }
-
-        let offset_diff = start_offset - first_offset;
-        let first_message_index = offset_diff as usize;
-
-        if first_message_index >= self.count() as usize {
-            return None;
-        }
-
-        self.slice_by_index(first_message_index as u32, count)
-    }
-
-    /// Helper method to slice the batch starting from a specific index
-    fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> {
-        if start_index >= self.count() {
-            return None;
-        }
-
-        let last_message_index =
-            std::cmp::min((start_index + count) as usize, self.count() as 
usize);
-
-        let sub_indexes = self.indexes.slice_by_offset(
-            start_index,
-            (last_message_index - start_index as usize) as u32,
-        )?;
-
-        let first_message_position = self.message_start_position(start_index 
as usize);
-        let last_message_position = 
self.message_end_position(last_message_index - 1);
-
-        let sub_buffer = self
-            .messages
-            .slice(first_message_position..last_message_position);
-
-        Some(IggyMessagesBatch {
-            count: (last_message_index - start_index as usize) as u32,
-            indexes: sub_indexes,
-            messages: sub_buffer,
-        })
-    }
-
-    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
-    /// whose message headers have a timestamp greater than or equal to the 
provided `timestamp`.
-    ///
-    /// If no messages meet the criteria, returns `None`.
-    pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> 
Option<Self> {
-        if self.is_empty() || count == 0 {
-            return None;
-        }
-
-        // Use binary search to find the first message with timestamp >= the 
target
-        let first_message_index = self.binary_search_timestamp(timestamp)?;
-
-        self.slice_by_index(first_message_index, count)
-    }
-
-    /// Find the position of the index with timestamp closest to (but not 
exceeding) the target
-    fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> {
-        if self.count() == 0 {
-            return None;
-        }
-
-        let last_timestamp = self.get(self.count() as usize - 
1)?.header().timestamp();
-        if target_timestamp > last_timestamp {
-            return Some(self.count() - 1);
-        }
-
-        let first_timestamp = self.get(0)?.header().timestamp();
-        if target_timestamp <= first_timestamp {
-            return Some(0);
-        }
-
-        let mut low = 0;
-        let mut high = self.count() - 1;
-
-        while low <= high {
-            let mid = low + (high - low) / 2;
-            let mid_index = self.get(mid as usize)?;
-            let mid_timestamp = mid_index.header().timestamp();
-
-            match mid_timestamp.cmp(&target_timestamp) {
-                std::cmp::Ordering::Equal => return Some(mid),
-                std::cmp::Ordering::Less => low = mid + 1,
-                std::cmp::Ordering::Greater => {
-                    if mid == 0 {
-                        break;
-                    }
-                    high = mid - 1;
-                }
-            }
-        }
-
-        Some(low)
-    }
-
     /// Get the message at the specified index.
     /// Returns None if the index is out of bounds.
     pub fn get(&self, index: usize) -> Option<IggyMessageView> {
@@ -294,58 +184,6 @@ impl IggyMessagesBatch {
 
         result
     }
-
-    /// Validates that all messages in batch have correct checksums.
-    pub fn validate_checksums(&self) -> Result<(), IggyError> {
-        for message in self.iter() {
-            let calculated_checksum = message.calculate_checksum();
-            let actual_checksum = message.header().checksum();
-            let offset = message.header().offset();
-            if calculated_checksum != actual_checksum {
-                return Err(IggyError::InvalidMessageChecksum(
-                    actual_checksum,
-                    calculated_checksum,
-                    offset,
-                ));
-            }
-        }
-        Ok(())
-    }
-
-    /// Validates that all messages have correct checksums and offsets.
-    /// This function should be called after messages have been read from disk.
-    ///
-    /// # Arguments
-    ///
-    /// * `absolute_start_offset` - The absolute offset of the first message 
in the batch.
-    ///
-    /// # Returns
-    ///
-    /// * `Ok(())` - If all messages have correct checksums and offsets.
-    /// * `Err(IggyError)` - If any message has an invalid checksum or offset.
-    pub fn validate_checksums_and_offsets(
-        &self,
-        absolute_start_offset: u64,
-    ) -> Result<(), IggyError> {
-        let mut current_offset = absolute_start_offset;
-        for message in self.iter() {
-            let calculated_checksum = message.calculate_checksum();
-            let actual_checksum = message.header().checksum();
-            let offset = message.header().offset();
-            if offset != current_offset {
-                return Err(IggyError::InvalidOffset(offset));
-            }
-            if calculated_checksum != actual_checksum {
-                return Err(IggyError::InvalidMessageChecksum(
-                    actual_checksum,
-                    calculated_checksum,
-                    offset,
-                ));
-            }
-            current_offset += 1;
-        }
-        Ok(())
-    }
 }
 
 impl IntoIterator for IggyMessagesBatch {
@@ -375,6 +213,7 @@ impl Index<usize> for IggyMessagesBatch {
         &self.messages[start..end]
     }
 }
+
 impl BytesSerializable for IggyMessagesBatch {
     fn to_bytes(&self) -> Bytes {
         panic!("should not be used");
diff --git a/server/Cargo.toml b/server/Cargo.toml
index d4483c5e..4b1ec236 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -56,6 +56,7 @@ bytes = "1.10.1"
 chrono = "0.4.40"
 clap = { version = "4.5.34", features = ["derive"] }
 console-subscriber = { version = "0.4.1", optional = true }
+crossbeam = "0.8.4"
 dashmap = "6.1.0"
 derive_more = "2.0.1"
 dotenvy = { version = "0.15.7" }
@@ -72,6 +73,7 @@ lending-iterator = "0.1.7"
 mimalloc = { version = "0.1", optional = true }
 moka = { version = "0.12.10", features = ["future"] }
 nix = { version = "0.29", features = ["fs"] }
+once_cell = "1.21.3"
 openssl = { version = "0.10.71", features = ["vendored"] }
 opentelemetry = { version = "0.29.0", features = ["trace", "logs"] }
 opentelemetry-appender-tracing = { version = "0.29.1", features = ["log"] }
diff --git a/server/src/binary/command.rs b/server/src/binary/command.rs
index 77e9c8a5..f138467f 100644
--- a/server/src/binary/command.rs
+++ b/server/src/binary/command.rs
@@ -69,8 +69,6 @@ use iggy::users::update_user::UpdateUser;
 use strum::EnumString;
 use tracing::error;
 
-// todo add missing commands
-
 define_server_command_enum! {
     Ping(Ping), PING_CODE, PING, false;
     GetStats(GetStats), GET_STATS_CODE, GET_STATS, false;
diff --git a/server/src/binary/handlers/messages/send_messages_handler.rs 
b/server/src/binary/handlers/messages/send_messages_handler.rs
index 1c1bc2b9..a043e736 100644
--- a/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -21,8 +21,8 @@ use crate::binary::sender::SenderKind;
 use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::SharedSystem;
+use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL};
 use anyhow::Result;
-use bytes::{Buf, BytesMut};
 use iggy::error::IggyError;
 use iggy::identifier::Identifier;
 use iggy::models::messaging::INDEX_SIZE;
@@ -49,40 +49,56 @@ impl ServerCommandHandler for SendMessages {
         session: &Session,
         system: &SharedSystem,
     ) -> Result<(), IggyError> {
-        let length = length - 4;
-        let mut buffer = BytesMut::with_capacity(length as usize);
-        unsafe {
-            buffer.set_len(length as usize);
-        }
-        sender.read(&mut buffer).await?;
-
-        let mut element_size;
-
-        let stream_id = Identifier::from_raw_bytes(buffer.chunk())?;
-        element_size = stream_id.get_size_bytes().as_bytes_usize();
-        buffer.advance(element_size);
-        self.stream_id = stream_id;
+        let total_payload_size = length as usize - std::mem::size_of::<u32>();
+        let metadata_len_field_size = std::mem::size_of::<u32>();
 
-        let topic_id = Identifier::from_raw_bytes(buffer.chunk())?;
-        element_size = topic_id.get_size_bytes().as_bytes_usize();
-        buffer.advance(element_size);
-        self.topic_id = topic_id;
+        let mut metadata_length_buffer = [0u8; 4];
+        sender.read(&mut metadata_length_buffer).await?;
+        let metadata_size = u32::from_le_bytes(metadata_length_buffer);
 
-        let partitioning = Partitioning::from_raw_bytes(buffer.chunk())?;
-        element_size = partitioning.get_size_bytes().as_bytes_usize();
-        buffer.advance(element_size);
-        self.partitioning = partitioning;
+        let mut metadata_buffer = BYTES_MUT_POOL.get_buffer(metadata_size as 
usize);
+        unsafe { metadata_buffer.set_len(metadata_size as usize) };
+        sender.read(&mut metadata_buffer).await?;
 
-        let messages_count = buffer.get_u32_le() as usize;
+        let mut element_size = 0;
 
-        let mut indexes_and_messages = buffer.split();
+        let stream_id = Identifier::from_raw_bytes(&metadata_buffer)?;
+        element_size += stream_id.get_size_bytes().as_bytes_usize();
+        self.stream_id = stream_id;
 
-        let messages = indexes_and_messages.split_off(messages_count * 
INDEX_SIZE);
-        let indexes = indexes_and_messages;
+        let topic_id = 
Identifier::from_raw_bytes(&metadata_buffer[element_size..])?;
+        element_size += topic_id.get_size_bytes().as_bytes_usize();
+        self.topic_id = topic_id;
 
-        let indexes = IggyIndexesMut::from_bytes(indexes);
+        let partitioning = 
Partitioning::from_raw_bytes(&metadata_buffer[element_size..])?;
+        element_size += partitioning.get_size_bytes().as_bytes_usize();
+        self.partitioning = partitioning;
 
-        let batch = IggyMessagesBatchMut::from_indexes_and_messages(indexes, 
messages);
+        let messages_count = u32::from_le_bytes(
+            metadata_buffer[element_size..element_size + 4]
+                .try_into()
+                .unwrap(),
+        );
+        let indexes_size = messages_count as usize * INDEX_SIZE;
+
+        metadata_buffer.return_to_pool();
+
+        let mut indexes_buffer = BYTES_MUT_POOL.get_buffer(indexes_size);
+        unsafe { indexes_buffer.set_len(indexes_size) };
+        sender.read(&mut indexes_buffer).await?;
+
+        let messages_size =
+            total_payload_size - metadata_size as usize - indexes_size - 
metadata_len_field_size;
+        let mut messages_buffer = BYTES_MUT_POOL.get_buffer(messages_size);
+        unsafe { messages_buffer.set_len(messages_size) };
+        sender.read(&mut messages_buffer).await?;
+
+        let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 0);
+        let batch = IggyMessagesBatchMut::from_indexes_and_messages(
+            messages_count,
+            indexes,
+            messages_buffer,
+        );
 
         batch.validate()?;
 
diff --git a/server/src/channels/commands/print_sysinfo.rs 
b/server/src/channels/commands/print_sysinfo.rs
index f40bb4e0..c32c7229 100644
--- a/server/src/channels/commands/print_sysinfo.rs
+++ b/server/src/channels/commands/print_sysinfo.rs
@@ -17,8 +17,9 @@
  */
 
 use crate::{
-    channels::server_command::BackgroundServerCommand, 
configs::server::ServerConfig,
-    streaming::systems::system::SharedSystem,
+    channels::server_command::BackgroundServerCommand,
+    configs::server::ServerConfig,
+    streaming::{systems::system::SharedSystem, 
utils::bytes_mut_pool::BYTES_MUT_POOL},
 };
 use flume::{Receiver, Sender};
 use human_repr::HumanCount;
@@ -89,6 +90,8 @@ impl BackgroundServerCommand<SysInfoPrintCommand> for 
SysInfoPrintExecutor {
               stats.read_bytes,
               stats.written_bytes,
               stats.run_time);
+
+        BYTES_MUT_POOL.log_stats();
     }
 
     fn start_command_sender(
diff --git a/server/src/http/jwt/storage.rs b/server/src/http/jwt/storage.rs
index 05389a9c..30884c09 100644
--- a/server/src/http/jwt/storage.rs
+++ b/server/src/http/jwt/storage.rs
@@ -17,13 +17,14 @@
  */
 
 use crate::http::jwt::COMPONENT;
+use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL};
 use crate::streaming::utils::file;
 use crate::{
     http::jwt::json_web_token::RevokedAccessToken, 
streaming::persistence::persister::PersisterKind,
 };
 use ahash::AHashMap;
 use anyhow::Context;
-use bytes::{BufMut, BytesMut};
+use bytes::BufMut;
 use error_set::ErrContext;
 use iggy::error::IggyError;
 use std::sync::Arc;
@@ -69,7 +70,7 @@ impl TokenStorage {
             })
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len() as usize;
-        let mut buffer = BytesMut::with_capacity(file_size);
+        let mut buffer = BYTES_MUT_POOL.get_buffer(file_size);
         buffer.put_bytes(0, file_size);
         file.read_exact(&mut buffer)
             .await
@@ -87,6 +88,8 @@ impl TokenStorage {
                 .map_err(|_| IggyError::CannotDeserializeResource)?
                 .0;
 
+        buffer.return_to_pool();
+
         let tokens = tokens
             .into_iter()
             .map(|(id, expiry)| RevokedAccessToken { id, expiry })
diff --git a/server/src/http/messages.rs b/server/src/http/messages.rs
index 17dc5ad0..1decefdb 100644
--- a/server/src/http/messages.rs
+++ b/server/src/http/messages.rs
@@ -143,6 +143,8 @@ fn make_mutable(batch: IggyMessagesBatch) -> 
IggyMessagesBatchMut {
     let (_, indexes, messages) = batch.decompose();
     let (_, indexes_buffer) = indexes.decompose();
     let indexes_buffer_mut = BytesMut::from(indexes_buffer);
-    let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut);
-    IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, 
messages.into())
+    let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0);
+    let count = indexes_mut.count();
+    let messages_mut = messages.into();
+    IggyMessagesBatchMut::from_indexes_and_messages(count, indexes_mut, 
messages_mut)
 }
diff --git a/server/src/main.rs b/server/src/main.rs
index 7eac061e..e9a81962 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -38,6 +38,7 @@ use server::log::tokio_console::Logging;
 use server::quic::quic_server;
 use server::server_error::ServerError;
 use server::streaming::systems::system::{SharedSystem, System};
+use server::streaming::utils::bytes_mut_pool::BytesMutPool;
 use server::tcp::tcp_server;
 use tokio::time::Instant;
 use tracing::{info, instrument};
@@ -94,6 +95,8 @@ async fn main() -> Result<(), ServerError> {
     #[cfg(not(feature = "disable-mimalloc"))]
     info!("Using mimalloc allocator");
 
+    BytesMutPool::init_pool();
+
     let system = SharedSystem::new(System::new(
         config.system.clone(),
         config.data_maintenance.clone(),
diff --git a/server/src/streaming/segments/indexes/index_reader.rs 
b/server/src/streaming/segments/indexes/index_reader.rs
index 55fc0ac5..cf0d0438 100644
--- a/server/src/streaming/segments/indexes/index_reader.rs
+++ b/server/src/streaming/segments/indexes/index_reader.rs
@@ -17,9 +17,10 @@
  */
 
 use super::IggyIndexesMut;
+use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL};
 use bytes::BytesMut;
 use error_set::ErrContext;
-use iggy::models::messaging::{IggyIndex, IggyIndexes, INDEX_SIZE};
+use iggy::models::messaging::{IggyIndex, INDEX_SIZE};
 use iggy::{error::IggyError, models::messaging::IggyIndexView};
 use std::{
     fs::File as StdFile,
@@ -102,7 +103,7 @@ impl IndexReader {
             }
         };
         let index_count = file_size / INDEX_SIZE as u32;
-        let indexes = IggyIndexesMut::from_bytes(buf);
+        let indexes = IggyIndexesMut::from_bytes(buf, index_count);
         if indexes.count() != index_count {
             error!(
                 "Loaded {} indexes from disk, expected {}, file {} is probably 
corrupted!",
@@ -123,7 +124,7 @@ impl IndexReader {
         &self,
         relative_start_offset: u32,
         count: u32,
-    ) -> Result<Option<IggyIndexes>, IggyError> {
+    ) -> Result<Option<IggyIndexesMut>, IggyError> {
         let file_size = self.file_size();
         let total_indexes = file_size / INDEX_SIZE as u32;
 
@@ -198,8 +199,8 @@ impl IndexReader {
             base_position
         );
 
-        Ok(Some(IggyIndexes::new(
-            indexes_bytes.freeze(),
+        Ok(Some(IggyIndexesMut::from_bytes(
+            indexes_bytes,
             base_position,
         )))
     }
@@ -212,7 +213,7 @@ impl IndexReader {
         &self,
         timestamp: u64,
         count: u32,
-    ) -> Result<Option<IggyIndexes>, IggyError> {
+    ) -> Result<Option<IggyIndexesMut>, IggyError> {
         let file_size = self.file_size();
         let total_indexes = file_size / INDEX_SIZE as u32;
 
@@ -281,8 +282,8 @@ impl IndexReader {
             base_position
         );
 
-        Ok(Some(IggyIndexes::new(
-            indexes_bytes.freeze(),
+        Ok(Some(IggyIndexesMut::from_bytes(
+            indexes_bytes,
             base_position,
         )))
     }
@@ -354,7 +355,7 @@ impl IndexReader {
     async fn read_at(&self, offset: u32, len: u32) -> Result<BytesMut, 
std::io::Error> {
         let file = self.file.clone();
         spawn_blocking(move || {
-            let mut buf = BytesMut::with_capacity(len as usize);
+            let mut buf = BYTES_MUT_POOL.get_buffer(len as usize);
             unsafe { buf.set_len(len as usize) };
             file.read_exact_at(&mut buf, offset as u64)?;
             Ok(buf)
@@ -395,6 +396,9 @@ impl IndexReader {
             }
         };
 
-        Ok(Some(IggyIndexView::new(&buf).to_index()))
+        let index = IggyIndexView::new(&buf).to_index();
+        buf.return_to_pool();
+
+        Ok(Some(index))
     }
 }
diff --git a/server/src/streaming/segments/indexes/indexes_mut.rs 
b/server/src/streaming/segments/indexes/indexes_mut.rs
index 0c16cb0f..da89a334 100644
--- a/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -16,11 +16,13 @@
  * under the License.
  */
 
-use bytes::{BufMut, Bytes, BytesMut};
-use iggy::models::messaging::{IggyIndexView, IggyIndexes, INDEX_SIZE};
+use bytes::{BufMut, BytesMut};
+use iggy::models::messaging::{IggyIndexView, INDEX_SIZE};
 use std::fmt;
 use std::ops::{Deref, Index as StdIndex};
 
+use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL};
+
 /// A container for binary-encoded index data.
 /// Optimized for efficient storage and I/O operations.
 #[derive(Default, Clone)]
@@ -41,17 +43,24 @@ impl IggyIndexesMut {
     }
 
     /// Creates indexes from bytes
-    pub fn from_bytes(indexes: BytesMut) -> Self {
+    pub fn from_bytes(indexes: BytesMut, base_position: u32) -> Self {
         Self {
             buffer: indexes,
             saved_count: 0,
-            base_position: 0,
+            base_position,
         }
     }
 
+    /// Decompose the container into its components
+    pub fn decompose(mut self) -> (u32, BytesMut) {
+        let base_position = self.base_position;
+        let buffer = std::mem::replace(&mut self.buffer, BytesMut::new());
+        (base_position, buffer)
+    }
+
     /// Gets the size of all indexes messages
     pub fn messages_size(&self) -> u32 {
-        self.last_position()
+        self.last_position() - self.base_position
     }
 
     /// Gets the base position of the indexes
@@ -74,15 +83,15 @@ impl IggyIndexesMut {
     /// Creates a new container with the specified capacity
     pub fn with_capacity(capacity: usize, base_position: u32) -> Self {
         Self {
-            buffer: BytesMut::with_capacity(capacity * INDEX_SIZE),
+            buffer: BYTES_MUT_POOL.get_buffer(capacity * INDEX_SIZE),
             saved_count: 0,
             base_position,
         }
     }
 
-    /// Makes the indexes immutable
-    pub fn make_immutable(self) -> IggyIndexes {
-        IggyIndexes::new(self.buffer.freeze(), self.base_position)
+    /// Gets the capacity of the buffer
+    pub fn capacity(&self) -> usize {
+        self.buffer.capacity()
     }
 
     /// Inserts a new index at the end of buffer
@@ -93,8 +102,8 @@ impl IggyIndexesMut {
     }
 
     /// Appends another slice of indexes to this one.
-    pub fn concatenate(&mut self, other: Bytes) {
-        self.buffer.put(other);
+    pub fn append_slice(&mut self, other: &[u8]) {
+        self.buffer.put_slice(other);
     }
 
     /// Gets the number of indexes in the container
@@ -225,7 +234,11 @@ impl IggyIndexesMut {
     }
 
     /// Slices the container to return a view of a specific range of indexes
-    pub fn slice_by_offset(&self, relative_start_offset: u32, count: u32) -> 
Option<IggyIndexes> {
+    pub fn slice_by_offset(
+        &self,
+        relative_start_offset: u32,
+        count: u32,
+    ) -> Option<IggyIndexesMut> {
         let available_count = 
self.count().saturating_sub(relative_start_offset);
         let actual_count = std::cmp::min(count, available_count);
 
@@ -240,15 +253,15 @@ impl IggyIndexesMut {
         let slice = BytesMut::from(&self.buffer[start_byte..end_byte]);
 
         if relative_start_offset == 0 {
-            Some(IggyIndexes::new(slice.freeze(), self.base_position))
+            Some(IggyIndexesMut::from_bytes(slice, self.base_position))
         } else {
             let position_offset = self.get(relative_start_offset - 
1).unwrap().position();
-            Some(IggyIndexes::new(slice.freeze(), position_offset))
+            Some(IggyIndexesMut::from_bytes(slice, position_offset))
         }
     }
 
     /// Loads indexes from cache based on timestamp
-    pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> 
Option<IggyIndexes> {
+    pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> 
Option<IggyIndexesMut> {
         if self.count() == 0 {
             return None;
         }
@@ -274,7 +287,7 @@ impl IggyIndexesMut {
             0
         };
 
-        Some(IggyIndexes::new(slice.freeze(), base_position))
+        Some(IggyIndexesMut::from_bytes(slice, base_position))
     }
 
     /// Find the position of the index with timestamp closest to (but not 
exceeding) the target
@@ -317,6 +330,13 @@ impl IggyIndexesMut {
     }
 }
 
+impl Drop for IggyIndexesMut {
+    fn drop(&mut self) {
+        let indexes = std::mem::replace(&mut self.buffer, BytesMut::new());
+        indexes.return_to_pool();
+    }
+}
+
 impl StdIndex<usize> for IggyIndexesMut {
     type Output = [u8];
 
diff --git a/server/src/streaming/segments/messages/messages_reader.rs 
b/server/src/streaming/segments/messages/messages_reader.rs
index fbff0fb0..c1d0f99b 100644
--- a/server/src/streaming/segments/messages/messages_reader.rs
+++ b/server/src/streaming/segments/messages/messages_reader.rs
@@ -16,12 +16,11 @@
  * under the License.
  */
 
-use bytes::{Bytes, BytesMut};
+use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
+use crate::streaming::utils::bytes_mut_pool::BYTES_MUT_POOL;
+use bytes::BytesMut;
 use error_set::ErrContext;
-use iggy::{
-    error::IggyError,
-    models::messaging::{IggyIndexes, IggyMessagesBatch},
-};
+use iggy::error::IggyError;
 use std::{fs::File as StdFile, os::unix::prelude::FileExt};
 use std::{
     io::ErrorKind,
@@ -90,12 +89,11 @@ impl MessagesReader {
     /// Loads and returns all message IDs from the messages file.
     pub async fn load_all_message_ids_from_disk(
         &self,
-        indexes: IggyIndexes,
+        indexes: IggyIndexesMut,
         messages_count: u32,
     ) -> Result<Vec<u128>, IggyError> {
         let file_size = self.file_size();
         if file_size == 0 {
-            trace!("Messages file {} is empty.", self.file_path);
             return Ok(vec![]);
         }
 
@@ -113,7 +111,11 @@ impl MessagesReader {
             }
         };
 
-        let messages = IggyMessagesBatch::new(indexes, messages_bytes, 
messages_count);
+        let messages = IggyMessagesBatchMut::from_indexes_and_messages(
+            messages_count,
+            indexes,
+            messages_bytes,
+        );
         let mut ids = Vec::with_capacity(messages_count as usize);
 
         for message in messages.iter() {
@@ -126,12 +128,11 @@ impl MessagesReader {
     /// Loads and returns a batch of messages from the messages file.
     pub async fn load_messages_from_disk(
         &self,
-        indexes: IggyIndexes,
-    ) -> Result<IggyMessagesBatch, IggyError> {
+        indexes: IggyIndexesMut,
+    ) -> Result<IggyMessagesBatchMut, IggyError> {
         let file_size = self.file_size();
         if file_size == 0 {
-            trace!("Messages file {} is empty.", self.file_path);
-            return Ok(IggyMessagesBatch::empty());
+            return Ok(IggyMessagesBatchMut::empty());
         }
 
         let start_pos = indexes.base_position();
@@ -139,13 +140,13 @@ impl MessagesReader {
         let messages_count = indexes.count();
 
         if start_pos + count_bytes > file_size {
-            return Ok(IggyMessagesBatch::empty());
+            return Ok(IggyMessagesBatchMut::empty());
         }
 
         let messages_bytes = match self.read_at(start_pos as u64, 
count_bytes).await {
             Ok(buf) => buf,
             Err(error) if error.kind() == ErrorKind::UnexpectedEof => {
-                return Ok(IggyMessagesBatch::empty());
+                return Ok(IggyMessagesBatchMut::empty());
             }
             Err(error) => {
                 error!(
@@ -156,10 +157,10 @@ impl MessagesReader {
             }
         };
 
-        Ok(IggyMessagesBatch::new(
+        Ok(IggyMessagesBatchMut::from_indexes_and_messages(
+            messages_count,
             indexes,
             messages_bytes,
-            messages_count,
         ))
     }
 
@@ -169,13 +170,13 @@ impl MessagesReader {
     }
 
     /// Reads `len` bytes from the messages file at the specified `offset`.
-    async fn read_at(&self, offset: u64, len: u32) -> Result<Bytes, 
std::io::Error> {
+    async fn read_at(&self, offset: u64, len: u32) -> Result<BytesMut, 
std::io::Error> {
         let file = self.file.clone();
         spawn_blocking(move || {
-            let mut buf = BytesMut::with_capacity(len as usize);
+            let mut buf = BYTES_MUT_POOL.get_buffer(len as usize);
             unsafe { buf.set_len(len as usize) };
             file.read_exact_at(&mut buf, offset)?;
-            Ok(buf.freeze())
+            Ok(buf)
         })
         .await?
     }
diff --git a/server/src/streaming/segments/messages_accumulator.rs 
b/server/src/streaming/segments/messages_accumulator.rs
index a8a07663..fe259c27 100644
--- a/server/src/streaming/segments/messages_accumulator.rs
+++ b/server/src/streaming/segments/messages_accumulator.rs
@@ -66,11 +66,10 @@ impl MessagesAccumulator {
         segment_start_offset: u64,
         segment_current_offset: u64,
         segment_current_position: u32,
-        batch: IggyMessagesBatchMut,
+        mut batch: IggyMessagesBatchMut,
         deduplicator: Option<&MessageDeduplicator>,
     ) {
         let batch_messages_count = batch.count();
-
         if batch_messages_count == 0 {
             return;
         }
@@ -85,7 +84,7 @@ impl MessagesAccumulator {
 
         self.initialize_or_update_offsets(segment_current_offset, 
segment_current_position);
 
-        let prepared_batch = batch
+        batch
             .prepare_for_persistence(
                 segment_start_offset,
                 self.current_offset,
@@ -94,9 +93,9 @@ impl MessagesAccumulator {
             )
             .await;
 
-        let batch_size = prepared_batch.size();
+        let batch_size = batch.size();
 
-        self.batches.add_batch(prepared_batch);
+        self.batches.add_batch(batch);
 
         self.messages_count += batch_messages_count;
         self.current_offset = self.base_offset + self.messages_count as u64 - 
1;
@@ -151,7 +150,7 @@ impl MessagesAccumulator {
 
     /// Checks if the accumulator is empty (has no messages).
     pub fn is_empty(&self) -> bool {
-        self.batches.is_empty() || self.messages_count == 0
+        self.messages_count == 0
     }
 
     /// Returns the number of messages in the accumulator that have not been 
persisted.
diff --git a/server/src/streaming/segments/reading_messages.rs 
b/server/src/streaming/segments/reading_messages.rs
index 34fbc44d..7ac79bf1 100644
--- a/server/src/streaming/segments/reading_messages.rs
+++ b/server/src/streaming/segments/reading_messages.rs
@@ -16,10 +16,9 @@
  * under the License.
  */
 
-use super::IggyMessagesBatchSet;
+use super::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet};
 use crate::streaming::segments::segment::Segment;
 use error_set::ErrContext;
-use iggy::models::messaging::{IggyIndexes, IggyMessagesBatch};
 use iggy::prelude::*;
 use std::sync::atomic::Ordering;
 use tracing::trace;
@@ -276,7 +275,7 @@ impl Segment {
         &self,
         relative_start_offset: u32,
         count: u32,
-    ) -> Result<Option<IggyIndexes>, IggyError> {
+    ) -> Result<Option<IggyIndexesMut>, IggyError> {
         let indexes = if !self.indexes.is_empty() {
             self.indexes.slice_by_offset(relative_start_offset, count)
         } else {
@@ -293,7 +292,7 @@ impl Segment {
         &self,
         timestamp: u64,
         count: u32,
-    ) -> Result<Option<IggyIndexes>, IggyError> {
+    ) -> Result<Option<IggyIndexesMut>, IggyError> {
         let indexes = if !self.indexes.is_empty() {
             self.indexes.slice_by_timestamp(timestamp, count)
         } else {
@@ -359,7 +358,7 @@ impl Segment {
         &self,
         timestamp: u64,
         count: u32,
-    ) -> Result<IggyMessagesBatch, IggyError> {
+    ) -> Result<IggyMessagesBatchMut, IggyError> {
         tracing::trace!(
             "Loading {count} messages from disk, timestamp: {timestamp}, 
current_timestamp: {}...",
             self.end_timestamp
@@ -368,7 +367,7 @@ impl Segment {
         let indexes_to_read = self.load_indexes_by_timestamp(timestamp, 
count).await?;
 
         if indexes_to_read.is_none() {
-            return Ok(IggyMessagesBatch::empty());
+            return Ok(IggyMessagesBatchMut::empty());
         }
 
         let indexes_to_read = indexes_to_read.unwrap();
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index 026834ae..fcb5bb90 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -100,7 +100,7 @@ impl Segment {
             last_index_position: 0,
             max_size_bytes: config.segment.size,
             message_expiry,
-            indexes: IggyIndexesMut::with_capacity(1_000_000, 0),
+            indexes: IggyIndexesMut::with_capacity(1024 * 1024, 0),
             accumulator: MessagesAccumulator::default(),
             is_closed: false,
             messages_writer: None,
diff --git a/server/src/streaming/segments/types/messages_batch_mut.rs 
b/server/src/streaming/segments/types/messages_batch_mut.rs
index 60af072b..87c87c88 100644
--- a/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -19,14 +19,15 @@
 use super::message_view_mut::IggyMessageViewMutIterator;
 use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator;
 use crate::streaming::segments::indexes::IggyIndexesMut;
+use crate::streaming::utils::bytes_mut_pool::{BytesMutExt, BYTES_MUT_POOL};
 use crate::streaming::utils::random_id;
 use bytes::{BufMut, BytesMut};
 use iggy::messages::MAX_PAYLOAD_SIZE;
-use iggy::models::messaging::{IggyIndexView, IggyMessagesBatch, INDEX_SIZE};
+use iggy::models::messaging::{IggyIndexView, INDEX_SIZE};
 use iggy::prelude::*;
 use iggy::utils::timestamp::IggyTimestamp;
 use lending_iterator::prelude::*;
-use std::ops::Deref;
+use std::ops::{Deref, Index};
 use tracing::{error, warn};
 
 /// A container for mutable messages that are being prepared for persistence.
@@ -35,6 +36,9 @@ use tracing::{error, warn};
 /// and the corresponding index data that allows for efficient message lookup.
 #[derive(Debug, Default)]
 pub struct IggyMessagesBatchMut {
+    /// The number of messages in the batch
+    count: u32,
+
     /// The index data for all messages in the buffer
     indexes: IggyIndexesMut,
 
@@ -49,14 +53,32 @@ impl Sizeable for IggyMessagesBatchMut {
 }
 
 impl IggyMessagesBatchMut {
+    /// Creates a new empty messages container
+    pub fn empty() -> Self {
+        Self {
+            count: 0,
+            indexes: IggyIndexesMut::empty(),
+            messages: BytesMut::new(),
+        }
+    }
+
     /// Creates a new messages container from existing index and message 
buffers.
     ///
     /// # Arguments
     ///
     /// * `indexes` - Preprocessed index data
     /// * `messages` - Serialized message data
-    pub fn from_indexes_and_messages(indexes: IggyIndexesMut, messages: 
BytesMut) -> Self {
-        Self { indexes, messages }
+    /// * `count` - Number of messages in the batch
+    pub fn from_indexes_and_messages(
+        count: u32,
+        indexes: IggyIndexesMut,
+        messages: BytesMut,
+    ) -> Self {
+        Self {
+            count,
+            indexes,
+            messages,
+        }
     }
 
     /// Creates a new messages container from a slice of IggyMessage objects.
@@ -70,7 +92,7 @@ impl IggyMessagesBatchMut {
     /// * `messages` - Slice of message objects to store
     /// * `messages_size` - Total size of all messages in bytes
     pub fn from_messages(messages: &[IggyMessage], messages_size: u32) -> Self 
{
-        let mut messages_buffer = BytesMut::with_capacity(messages_size as 
usize);
+        let mut messages_buffer = BYTES_MUT_POOL.get_buffer(messages_size as 
usize);
         let mut indexes_buffer = IggyIndexesMut::with_capacity(messages.len(), 
0);
         let mut position = 0;
 
@@ -81,7 +103,7 @@ impl IggyMessagesBatchMut {
             indexes_buffer.insert(0, position, 0);
         }
 
-        Self::from_indexes_and_messages(indexes_buffer, messages_buffer)
+        Self::from_indexes_and_messages(messages.len() as u32, indexes_buffer, 
messages_buffer)
     }
 
     /// Creates a lending iterator that yields mutable views of messages.
@@ -117,15 +139,15 @@ impl IggyMessagesBatchMut {
     ///
     /// An immutable `IggyMessagesBatch` ready for persistence
     pub async fn prepare_for_persistence(
-        self,
+        &mut self,
         start_offset: u64,
         base_offset: u64,
         current_position: u32,
         deduplicator: Option<&MessageDeduplicator>,
-    ) -> IggyMessagesBatch {
+    ) {
         let messages_count = self.count();
         if messages_count == 0 {
-            return IggyMessagesBatch::empty();
+            return;
         }
 
         let mut curr_abs_offset = base_offset;
@@ -138,9 +160,10 @@ impl IggyMessagesBatchMut {
         let mut invalid_messages_indexes =
             deduplicator.map(|_| Vec::with_capacity(messages_count as usize));
 
-        let (mut indexes, mut messages) = self.decompose();
-        indexes.set_base_position(current_position);
-        let mut iter = IggyMessageViewMutIterator::new(&mut messages);
+        // let (mut indexes, mut messages) = self.decompose();
+        self.indexes.set_base_position(current_position);
+        let mut iter: IggyMessageViewMutIterator<'_> =
+            IggyMessageViewMutIterator::new(&mut self.messages);
         let timestamp = IggyTimestamp::now().as_micros();
 
         while let Some(mut message) = iter.next() {
@@ -169,9 +192,9 @@ impl IggyMessagesBatchMut {
             curr_position += message_size;
 
             let relative_offset = (curr_abs_offset - start_offset) as u32;
-            indexes.set_offset_at(curr_rel_offset, relative_offset);
-            indexes.set_position_at(curr_rel_offset, curr_position);
-            indexes.set_timestamp_at(curr_rel_offset, timestamp);
+            self.indexes.set_offset_at(curr_rel_offset, relative_offset);
+            self.indexes.set_position_at(curr_rel_offset, curr_position);
+            self.indexes.set_timestamp_at(curr_rel_offset, timestamp);
 
             curr_abs_offset += 1;
             curr_rel_offset += 1;
@@ -179,48 +202,53 @@ impl IggyMessagesBatchMut {
 
         if let Some(invalid_messages_indexes) = invalid_messages_indexes {
             if invalid_messages_indexes.is_empty() {
-                return IggyMessagesBatch::new(
-                    indexes.make_immutable(),
-                    messages.freeze(),
-                    messages_count,
-                );
+                return;
             }
-            let batch = 
IggyMessagesBatchMut::from_indexes_and_messages(indexes, messages)
-                .remove_messages(&invalid_messages_indexes, current_position);
-
-            let messages_count = batch.count();
-
-            let (indexes, messages) = batch.decompose();
-
-            return IggyMessagesBatch::new(
-                indexes.make_immutable(),
-                messages.freeze(),
-                messages_count,
-            );
+            self.remove_messages(&invalid_messages_indexes, current_position);
         }
+    }
 
-        IggyMessagesBatch::new(indexes.make_immutable(), messages.freeze(), 
messages_count)
+    /// Returns the first offset in the batch
+    pub fn first_offset(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
+        Some(IggyMessageView::new(&self.messages).header().offset())
     }
 
     /// Returns the first timestamp in the batch
-    pub fn first_timestamp(&self) -> u64 {
-        IggyMessageView::new(&self.messages).header().timestamp()
+    pub fn first_timestamp(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
+        Some(IggyMessageView::new(&self.messages).header().timestamp())
     }
 
     /// Returns the last timestamp in the batch
-    pub fn last_timestamp(&self) -> u64 {
+    pub fn last_timestamp(&self) -> Option<u64> {
         if self.is_empty() {
-            return 0;
+            return None;
         }
 
         let last_index = self.count() as usize - 1;
-        self.get_message_boundaries(last_index)
-            .map(|(start, _)| {
-                IggyMessageView::new(&self.messages[start..])
-                    .header()
-                    .timestamp()
-            })
-            .unwrap_or(0)
+        self.get_message_boundaries(last_index).map(|(start, _)| {
+            IggyMessageView::new(&self.messages[start..])
+                .header()
+                .timestamp()
+        })
+    }
+
+    /// Returns the last offset in the batch
+    pub fn last_offset(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
+        let last_index = self.count() as usize - 1;
+        self.get_message_boundaries(last_index).map(|(start, _)| {
+            IggyMessageView::new(&self.messages[start..])
+                .header()
+                .offset()
+        })
     }
 
     /// Checks if the batch is empty.
@@ -229,8 +257,21 @@ impl IggyMessagesBatchMut {
     }
 
     /// Decomposes the batch into its constituent parts.
-    pub fn decompose(self) -> (IggyIndexesMut, BytesMut) {
-        (self.indexes, self.messages)
+    pub fn decompose(mut self) -> (IggyIndexesMut, BytesMut) {
+        let indexes = std::mem::replace(&mut self.indexes, 
IggyIndexesMut::empty());
+        let messages = std::mem::replace(&mut self.messages, BytesMut::new());
+
+        (indexes, messages)
+    }
+
+    /// Take the indexes from the batch
+    pub fn take_indexes(&mut self) -> IggyIndexesMut {
+        std::mem::take(&mut self.indexes)
+    }
+
+    /// Borrows the indexes from the batch
+    pub fn indexes(&self) -> &IggyIndexesMut {
+        &self.indexes
     }
 
     /// Get message position from the indexes at the given index
@@ -266,6 +307,152 @@ impl IggyMessagesBatchMut {
         }
     }
 
+    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
+    /// whose message headers have an offset greater than or equal to the 
provided `start_offset`.
+    pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> 
Option<Self> {
+        if self.is_empty() || count == 0 {
+            return None;
+        }
+
+        let first_offset = self.first_offset()?;
+
+        if start_offset < first_offset {
+            return self.slice_by_index(0, count);
+        }
+
+        let last_offset = self.last_offset()?;
+        if start_offset > last_offset {
+            return None;
+        }
+
+        let offset_diff = start_offset - first_offset;
+        let first_message_index = offset_diff as usize;
+
+        if first_message_index >= self.count() as usize {
+            return None;
+        }
+
+        self.slice_by_index(first_message_index as u32, count)
+    }
+
+    /// Helper method to slice the batch starting from a specific index
+    fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> {
+        if start_index >= self.count() {
+            return None;
+        }
+
+        let last_message_index =
+            std::cmp::min((start_index + count) as usize, self.count() as 
usize);
+
+        let sub_indexes = self.indexes.slice_by_offset(
+            start_index,
+            (last_message_index - start_index as usize) as u32,
+        )?;
+
+        let first_message_position = self.message_start_position(start_index 
as usize)?;
+        let last_message_position = 
self.message_end_position(last_message_index - 1)?;
+
+        // TODO(hubcio): messages from accumulator unfortunately are 
deep-copied
+        let mut sub_buffer =
+            BYTES_MUT_POOL.get_buffer(last_message_position - 
first_message_position);
+        
sub_buffer.put_slice(&self.messages[first_message_position..last_message_position]);
+
+        Some(IggyMessagesBatchMut {
+            count: sub_indexes.count(),
+            indexes: sub_indexes,
+            messages: sub_buffer,
+        })
+    }
+
+    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
+    /// whose message headers have a timestamp greater than or equal to the 
provided `timestamp`.
+    ///
+    /// If no messages meet the criteria, returns `None`.
+    pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> 
Option<Self> {
+        if self.is_empty() || count == 0 {
+            return None;
+        }
+
+        // Use binary search to find the first message with timestamp >= the 
target
+        let first_message_index = self.binary_search_timestamp(timestamp)?;
+
+        self.slice_by_index(first_message_index, count)
+    }
+
+    /// Find the position of the index with timestamp closest to (but not 
exceeding) the target
+    fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> {
+        if self.count() == 0 {
+            return None;
+        }
+
+        let last_timestamp = self.get(self.count() as usize - 
1)?.header().timestamp();
+        if target_timestamp > last_timestamp {
+            return Some(self.count() - 1);
+        }
+
+        let first_timestamp = self.get(0)?.header().timestamp();
+        if target_timestamp <= first_timestamp {
+            return Some(0);
+        }
+
+        let mut low = 0;
+        let mut high = self.count() - 1;
+
+        while low <= high {
+            let mid = low + (high - low) / 2;
+            let mid_index = self.get(mid as usize)?;
+            let mid_timestamp = mid_index.header().timestamp();
+
+            match mid_timestamp.cmp(&target_timestamp) {
+                std::cmp::Ordering::Equal => return Some(mid),
+                std::cmp::Ordering::Less => low = mid + 1,
+                std::cmp::Ordering::Greater => {
+                    if mid == 0 {
+                        break;
+                    }
+                    high = mid - 1;
+                }
+            }
+        }
+
+        Some(low)
+    }
+
+    /// Validates that all messages have correct checksums and offsets.
+    /// This function should be called after messages have been read from disk.
+    ///
+    /// # Arguments
+    ///
+    /// * `absolute_start_offset` - The absolute offset of the first message 
in the batch.
+    ///
+    /// # Returns
+    ///
+    /// * `Ok(())` - If all messages have correct checksums and offsets.
+    /// * `Err(IggyError)` - If any message has an invalid checksum or offset.
+    pub fn validate_checksums_and_offsets(
+        &self,
+        absolute_start_offset: u64,
+    ) -> Result<(), IggyError> {
+        let mut current_offset = absolute_start_offset;
+        for message in self.iter() {
+            let calculated_checksum = message.calculate_checksum();
+            let actual_checksum = message.header().checksum();
+            let offset = message.header().offset();
+            if offset != current_offset {
+                return Err(IggyError::InvalidOffset(offset));
+            }
+            if calculated_checksum != actual_checksum {
+                return Err(IggyError::InvalidMessageChecksum(
+                    actual_checksum,
+                    calculated_checksum,
+                    offset,
+                ));
+            }
+            current_offset += 1;
+        }
+        Ok(())
+    }
+
     /// Gets the byte range for a message at the given index
     fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
         let start = self.message_start_position(index)?;
@@ -325,7 +512,7 @@ impl IggyMessagesBatchMut {
     /// # Returns
     ///
     /// A new `IggyMessagesBatchMut` with the specified messages removed
-    pub fn remove_messages(self, indexes_to_remove: &[u32], current_position: 
u32) -> Self {
+    pub fn remove_messages(&mut self, indexes_to_remove: &[u32], 
current_position: u32) {
         /*
             A temporary list of message boundaries is first collected for each 
index
             that should be removed. Chunks of data that are not removed are 
appended
@@ -336,14 +523,13 @@ impl IggyMessagesBatchMut {
             This allows for avoiding copying unnecessary data and ensures that 
indexes
             match the newly constructed buffer.
         */
-
         if indexes_to_remove.is_empty() || self.is_empty() {
-            return self;
+            return;
         }
 
         let msg_count = self.count() as usize;
         if indexes_to_remove.len() >= msg_count {
-            return IggyMessagesBatchMut::default();
+            return;
         }
 
         let current_size = self.size();
@@ -369,11 +555,11 @@ impl IggyMessagesBatchMut {
         let new_size = current_size - size_to_remove;
         let new_message_count = msg_count as u32 - indexes_to_remove.len() as 
u32;
 
-        let mut new_buffer = BytesMut::with_capacity(new_size as usize);
+        let mut new_buffer = BYTES_MUT_POOL.get_buffer(new_size as usize);
         let mut new_indexes =
             IggyIndexesMut::with_capacity(new_message_count as usize, 
current_position);
 
-        let mut source = self.messages;
+        let mut source = std::mem::take(&mut self.messages);
         let mut last_pos = 0_usize;
         let mut new_pos = current_position;
 
@@ -413,8 +599,23 @@ impl IggyMessagesBatchMut {
                 chunk_len,
             );
         }
+    }
 
-        IggyMessagesBatchMut::from_indexes_and_messages(new_indexes, 
new_buffer)
+    /// Validates that all messages in batch have correct checksums.
+    pub fn validate_checksums(&self) -> Result<(), IggyError> {
+        for message in self.iter() {
+            let calculated_checksum = message.calculate_checksum();
+            let actual_checksum = message.header().checksum();
+            let offset = message.header().offset();
+            if calculated_checksum != actual_checksum {
+                return Err(IggyError::InvalidMessageChecksum(
+                    actual_checksum,
+                    calculated_checksum,
+                    offset,
+                ));
+            }
+        }
+        Ok(())
     }
 
     /// Validates the structure of the indexes (sizes, counts, etc.)
@@ -590,6 +791,35 @@ impl Validatable<IggyError> for IggyMessagesBatchMut {
     }
 }
 
+impl Index<usize> for IggyMessagesBatchMut {
+    type Output = [u8];
+
+    fn index(&self, index: usize) -> &Self::Output {
+        if index >= self.count as usize {
+            panic!(
+                "Index out of bounds: the len is {} but the index is {}",
+                self.count, index
+            );
+        }
+
+        let (start, end) = self
+            .get_message_boundaries(index)
+            .expect("Invalid message boundaries");
+
+        &self.messages[start..end]
+    }
+}
+
+impl Drop for IggyMessagesBatchMut {
+    fn drop(&mut self) {
+        let indexes = std::mem::replace(&mut self.indexes, 
IggyIndexesMut::empty());
+        let messages = std::mem::replace(&mut self.messages, BytesMut::new());
+
+        messages.return_to_pool();
+        drop(indexes);
+    }
+}
+
 impl Deref for IggyMessagesBatchMut {
     type Target = BytesMut;
 
diff --git a/server/src/streaming/segments/types/messages_batch_set.rs 
b/server/src/streaming/segments/types/messages_batch_set.rs
index 6f32b383..32ecac29 100644
--- a/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/server/src/streaming/segments/types/messages_batch_set.rs
@@ -18,18 +18,20 @@
 
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::streaming::segments::IggyIndexesMut;
+use crate::streaming::utils::bytes_mut_pool::BytesMutExt;
 use bytes::Bytes;
 use iggy::models::messaging::IggyMessageView;
-use iggy::models::messaging::IggyMessagesBatch;
 use iggy::prelude::*;
 use std::ops::Index;
 use tracing::trace;
 
+use super::IggyMessagesBatchMut;
+
 /// A container for multiple IggyMessagesBatch objects
 #[derive(Debug, Default)]
 pub struct IggyMessagesBatchSet {
     /// The collection of message containers
-    batches: Vec<IggyMessagesBatch>,
+    batches: Vec<IggyMessagesBatchMut>,
     /// Total number of messages across all containers
     count: u32,
     /// Total size in bytes across all containers
@@ -56,7 +58,7 @@ impl IggyMessagesBatchSet {
     }
 
     /// Create a batch set from an existing vector of IggyMessages
-    pub fn from_vec(messages: Vec<IggyMessagesBatch>) -> Self {
+    pub fn from_vec(messages: Vec<IggyMessagesBatchMut>) -> Self {
         let mut batch = Self::with_capacity(messages.len());
         for msg in messages {
             batch.add_batch(msg);
@@ -65,25 +67,25 @@ impl IggyMessagesBatchSet {
     }
 
     /// Add a message container to the batch
-    pub fn add_batch(&mut self, messages: IggyMessagesBatch) {
+    pub fn add_batch(&mut self, messages: IggyMessagesBatchMut) {
         self.count += messages.count();
         self.size += messages.size();
         self.batches.push(messages);
     }
 
     /// Add another batch of messages to the batch
-    pub fn add_batch_set(&mut self, other: IggyMessagesBatchSet) {
+    pub fn add_batch_set(&mut self, mut other: IggyMessagesBatchSet) {
         self.count += other.count();
         self.size += other.size();
-        self.batches.extend(other.batches);
+        let other_batches = std::mem::take(&mut other.batches);
+        self.batches.extend(other_batches);
     }
 
     /// Extract indexes from all batches in the set
-    pub fn extract_indexes_to(&mut self, target: &mut IggyIndexesMut) {
-        for batch in self.iter_mut() {
-            let indexes = batch.take_indexes();
-            let (_, indexes_buffer) = indexes.decompose();
-            target.concatenate(indexes_buffer);
+    pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) {
+        for batch in self.iter() {
+            let indexes = batch.indexes();
+            target.append_slice(indexes);
         }
     }
 
@@ -109,16 +111,25 @@ impl IggyMessagesBatchSet {
 
     /// Get timestamp of first message in first batch
     pub fn first_timestamp(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
         self.batches.first().map(|batch| batch.first_timestamp())?
     }
 
     /// Get offset of first message in first batch
     pub fn first_offset(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
         self.batches.first().map(|batch| batch.first_offset())?
     }
 
     /// Get timestamp of last message in last batch
     pub fn last_timestamp(&self) -> Option<u64> {
+        if self.is_empty() {
+            return None;
+        }
         self.batches.last().map(|batch| batch.last_timestamp())?
     }
 
@@ -128,22 +139,22 @@ impl IggyMessagesBatchSet {
     }
 
     /// Get a reference to the underlying vector of message containers
-    pub fn inner(&self) -> &Vec<IggyMessagesBatch> {
+    pub fn inner(&self) -> &Vec<IggyMessagesBatchMut> {
         &self.batches
     }
 
     /// Consume the batch, returning the underlying vector of message 
containers
-    pub fn into_inner(self) -> Vec<IggyMessagesBatch> {
-        self.batches
+    pub fn into_inner(mut self) -> Vec<IggyMessagesBatchMut> {
+        std::mem::take(&mut self.batches)
     }
 
     /// Iterate over all message containers in the batch
-    pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatch> {
+    pub fn iter(&self) -> impl Iterator<Item = &IggyMessagesBatchMut> {
         self.batches.iter()
     }
 
     /// Iterate over all mutable message containers in the batch
-    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut IggyMessagesBatch> 
{
+    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut 
IggyMessagesBatchMut> {
         self.batches.iter_mut()
     }
 
@@ -334,31 +345,44 @@ impl Sizeable for IggyMessagesBatchSet {
     }
 }
 
-impl From<Vec<IggyMessagesBatch>> for IggyMessagesBatchSet {
-    fn from(messages: Vec<IggyMessagesBatch>) -> Self {
+impl From<Vec<IggyMessagesBatchMut>> for IggyMessagesBatchSet {
+    fn from(messages: Vec<IggyMessagesBatchMut>) -> Self {
         Self::from_vec(messages)
     }
 }
 
-impl From<IggyMessagesBatch> for IggyMessagesBatchSet {
-    fn from(messages: IggyMessagesBatch) -> Self {
+impl From<IggyMessagesBatchMut> for IggyMessagesBatchSet {
+    fn from(messages: IggyMessagesBatchMut) -> Self {
         Self::from_vec(vec![messages])
     }
 }
 
+impl Drop for IggyMessagesBatchSet {
+    fn drop(&mut self) {
+        for batch in self.batches.iter_mut() {
+            let batch = std::mem::take(batch);
+            let (indexes, messages) = batch.decompose();
+            messages.return_to_pool();
+            drop(indexes);
+        }
+    }
+}
+
 /// Iterator that consumes an IggyMessagesBatchSet and yields batches
 pub struct IggyMessagesBatchSetIntoIter {
-    batches: Vec<IggyMessagesBatch>,
+    batches: Vec<IggyMessagesBatchMut>,
     position: usize,
 }
 
 impl Iterator for IggyMessagesBatchSetIntoIter {
-    type Item = IggyMessagesBatch;
+    type Item = IggyMessagesBatchMut;
 
     fn next(&mut self) -> Option<Self::Item> {
         if self.position < self.batches.len() {
-            let batch =
-                std::mem::replace(&mut self.batches[self.position], 
IggyMessagesBatch::empty());
+            let batch = std::mem::replace(
+                &mut self.batches[self.position],
+                IggyMessagesBatchMut::empty(),
+            );
             self.position += 1;
             Some(batch)
         } else {
@@ -371,15 +395,3 @@ impl Iterator for IggyMessagesBatchSetIntoIter {
         (remaining, Some(remaining))
     }
 }
-
-impl IntoIterator for IggyMessagesBatchSet {
-    type Item = IggyMessagesBatch;
-    type IntoIter = IggyMessagesBatchSetIntoIter;
-
-    fn into_iter(self) -> Self::IntoIter {
-        IggyMessagesBatchSetIntoIter {
-            batches: self.batches,
-            position: 0,
-        }
-    }
-}
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index 7e80574b..3a40e3e1 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -84,17 +84,17 @@ impl Segment {
 
         let accumulator = std::mem::take(&mut self.accumulator);
 
-        let mut batches = accumulator.into_batch_set();
+        let batches = accumulator.into_batch_set();
         let confirmation = match confirmation {
             Some(val) => val,
             None => self.config.segment.server_confirmation,
         };
 
-        batches.extract_indexes_to(&mut self.indexes);
-
         let batch_size = batches.size();
         let batch_count = batches.count();
 
+        batches.append_indexes_to(&mut self.indexes);
+
         let saved_bytes = self
             .messages_writer
             .as_mut()
@@ -130,17 +130,15 @@ impl Segment {
 
         self.check_and_handle_segment_full().await?;
 
-        let saved_messages_count = unsaved_messages_count;
-
         trace!(
             "Saved {} messages on disk in segment with start offset: {} for 
partition with ID: {}, total bytes written: {}.",
-            saved_messages_count,
+            unsaved_messages_count,
             self.start_offset,
             self.partition_id,
             saved_bytes
         );
 
-        Ok(saved_messages_count)
+        Ok(unsaved_messages_count)
     }
 
     fn update_counters(&mut self, messages_size: u64, messages_count: u64) {
diff --git a/server/src/streaming/systems/messages.rs 
b/server/src/streaming/systems/messages.rs
index 2b758079..1716dd26 100644
--- a/server/src/streaming/systems/messages.rs
+++ b/server/src/streaming/systems/messages.rs
@@ -21,11 +21,10 @@ use crate::streaming::segments::{IggyIndexesMut, 
IggyMessagesBatchMut, IggyMessa
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::System;
 use crate::streaming::systems::COMPONENT;
-use bytes::BytesMut;
+use crate::streaming::utils::bytes_mut_pool::BYTES_MUT_POOL;
 use error_set::ErrContext;
 use iggy::confirmation::Confirmation;
 use iggy::consumer::Consumer;
-use iggy::models::messaging::IggyMessagesBatch;
 use iggy::prelude::*;
 use iggy::utils::crypto::EncryptorKind;
 use iggy::{error::IggyError, identifier::Identifier};
@@ -164,7 +163,7 @@ impl System {
             let count = batch.count();
 
             let mut indexes = IggyIndexesMut::with_capacity(batch.count() as 
usize, 0);
-            let mut decrypted_messages = BytesMut::with_capacity(batch.size() 
as usize);
+            let mut decrypted_messages = 
BYTES_MUT_POOL.get_buffer(batch.size() as usize);
             let mut position = 0;
 
             for message in batch.iter() {
@@ -185,9 +184,8 @@ impl System {
                     }
                 }
             }
-            let indexes = indexes.make_immutable();
-            let decrypted_messages = decrypted_messages.freeze();
-            let decrypted_batch = IggyMessagesBatch::new(indexes, 
decrypted_messages, count);
+            let decrypted_batch =
+                IggyMessagesBatchMut::from_indexes_and_messages(count, 
indexes, decrypted_messages);
             decrypted_batches.push(decrypted_batch);
         }
 
@@ -199,7 +197,8 @@ impl System {
         batch: IggyMessagesBatchMut,
         encryptor: &EncryptorKind,
     ) -> Result<IggyMessagesBatchMut, IggyError> {
-        let mut encrypted_messages = BytesMut::with_capacity(batch.size() as 
usize * 2);
+        let mut encrypted_messages = BYTES_MUT_POOL.get_buffer(batch.size() as 
usize * 2);
+        let count = batch.count();
         let mut indexes = IggyIndexesMut::with_capacity(batch.count() as 
usize, 0);
         let mut position = 0;
 
@@ -230,6 +229,7 @@ impl System {
         }
 
         Ok(IggyMessagesBatchMut::from_indexes_and_messages(
+            count,
             indexes,
             encrypted_messages,
         ))
diff --git a/server/src/streaming/systems/storage.rs 
b/server/src/streaming/systems/storage.rs
index 49649b5f..ff1cc3f2 100644
--- a/server/src/streaming/systems/storage.rs
+++ b/server/src/streaming/systems/storage.rs
@@ -16,13 +16,16 @@
  * under the License.
  */
 
-use crate::streaming::persistence::persister::PersisterKind;
 use crate::streaming::storage::SystemInfoStorage;
 use crate::streaming::systems::info::SystemInfo;
 use crate::streaming::systems::COMPONENT;
+use crate::streaming::utils::bytes_mut_pool::BytesMutExt;
 use crate::streaming::utils::file;
+use crate::streaming::{
+    persistence::persister::PersisterKind, 
utils::bytes_mut_pool::BYTES_MUT_POOL,
+};
 use anyhow::Context;
-use bytes::{BufMut, BytesMut};
+use bytes::BufMut;
 use error_set::ErrContext;
 use iggy::error::IggyError;
 use std::sync::Arc;
@@ -60,7 +63,7 @@ impl SystemInfoStorage for FileSystemInfoStorage {
             })
             .map_err(|_| IggyError::CannotReadFileMetadata)?
             .len() as usize;
-        let mut buffer = BytesMut::with_capacity(file_size);
+        let mut buffer = BYTES_MUT_POOL.get_buffer(file_size);
         buffer.put_bytes(0, file_size);
         file.read_exact(&mut buffer)
             .await
@@ -75,6 +78,7 @@ impl SystemInfoStorage for FileSystemInfoStorage {
             bincode::serde::decode_from_slice(&buffer, 
bincode::config::standard())
                 .with_context(|| "Failed to deserialize system info")
                 .map_err(|_| IggyError::CannotDeserializeResource)?;
+        buffer.return_to_pool();
         Ok(system_info)
     }
 
diff --git a/server/src/streaming/utils/bytes_mut_pool.rs 
b/server/src/streaming/utils/bytes_mut_pool.rs
new file mode 100644
index 00000000..82a8f302
--- /dev/null
+++ b/server/src/streaming/utils/bytes_mut_pool.rs
@@ -0,0 +1,313 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::BytesMut;
+use crossbeam::queue::ArrayQueue;
+use iggy::prelude::IggyByteSize;
+use once_cell::sync::Lazy;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use tracing::{debug, info, trace};
+
+pub static BYTES_MUT_POOL: Lazy<BytesMutPool> = 
Lazy::new(BytesMutPool::default);
+
+/// A pool for reusing BytesMut buffers
+#[derive(Clone)]
+pub struct BytesMutPool {
+    // Buffers
+    small_buffers: Arc<ArrayQueue<BytesMut>>,
+    medium_buffers: Arc<ArrayQueue<BytesMut>>,
+    large_buffers: Arc<ArrayQueue<BytesMut>>,
+    extra_large_buffers: Arc<ArrayQueue<BytesMut>>,
+    max_buffers: Arc<ArrayQueue<BytesMut>>,
+
+    // Stats
+    small_created: Arc<AtomicUsize>,
+    medium_created: Arc<AtomicUsize>,
+    large_created: Arc<AtomicUsize>,
+    extra_large_created: Arc<AtomicUsize>,
+    max_created: Arc<AtomicUsize>,
+    small_returned: Arc<AtomicUsize>,
+    medium_returned: Arc<AtomicUsize>,
+    large_returned: Arc<AtomicUsize>,
+    extra_large_returned: Arc<AtomicUsize>,
+    max_returned: Arc<AtomicUsize>,
+}
+
+impl BytesMutPool {
+    // TODO(hubcio): make BytesMutPool bucket sizes configurable via 
server.toml
+    const SMALL_BUFFER_SIZE: usize = 4 * 1024;
+    const MEDIUM_BUFFER_SIZE: usize = 32 * 1024;
+    const LARGE_BUFFER_SIZE: usize = 512 * 1024;
+    const EXTRA_LARGE_BUFFER_SIZE: usize = 2 * 1024 * 1024;
+    const MAX_BUFFER_SIZE: usize = 16 * 1024 * 1024;
+
+    const SMALL_POOL_SIZE: usize = 4096;
+    const MEDIUM_POOL_SIZE: usize = 1024;
+    const LARGE_POOL_SIZE: usize = 512;
+    const EXTRA_LARGE_POOL_SIZE: usize = 128;
+    const MAX_POOL_SIZE: usize = 64;
+
+    /// Initialize the bytes pool
+    pub fn init_pool() {
+        Lazy::force(&BYTES_MUT_POOL);
+    }
+
+    /// Get a buffer with at least the specified capacity
+    pub fn get_buffer(&self, capacity: usize) -> BytesMut {
+        if capacity <= Self::SMALL_BUFFER_SIZE {
+            if let Some(mut buffer) = self.small_buffers.pop() {
+                buffer.clear();
+                trace!("Reused small buffer with capacity: {}", 
buffer.capacity());
+                return buffer;
+            }
+            self.small_created.fetch_add(1, Ordering::Relaxed);
+            let buffer = BytesMut::with_capacity(Self::SMALL_BUFFER_SIZE);
+            trace!(
+                "Created new small buffer with capacity: {}",
+                buffer.capacity()
+            );
+            buffer
+        } else if capacity <= Self::MEDIUM_BUFFER_SIZE {
+            if let Some(mut buffer) = self.medium_buffers.pop() {
+                buffer.clear();
+                trace!("Reused medium buffer with capacity: {}", 
buffer.capacity());
+                return buffer;
+            }
+            self.medium_created.fetch_add(1, Ordering::Relaxed);
+            let buffer = BytesMut::with_capacity(Self::MEDIUM_BUFFER_SIZE);
+            trace!(
+                "Created new medium buffer with capacity: {}",
+                buffer.capacity()
+            );
+            buffer
+        } else if capacity <= Self::LARGE_BUFFER_SIZE {
+            if let Some(mut buffer) = self.large_buffers.pop() {
+                buffer.clear();
+                trace!("Reused large buffer with capacity: {}", 
buffer.capacity());
+                return buffer;
+            }
+            self.large_created.fetch_add(1, Ordering::Relaxed);
+            let buffer = BytesMut::with_capacity(Self::LARGE_BUFFER_SIZE);
+            trace!(
+                "Created new large buffer with capacity: {}",
+                buffer.capacity()
+            );
+            buffer
+        } else if capacity <= Self::EXTRA_LARGE_BUFFER_SIZE {
+            if let Some(mut buffer) = self.extra_large_buffers.pop() {
+                buffer.clear();
+                trace!(
+                    "Reused extra large buffer with capacity: {}",
+                    buffer.capacity()
+                );
+                return buffer;
+            }
+            self.extra_large_created.fetch_add(1, Ordering::Relaxed);
+            let buffer = 
BytesMut::with_capacity(Self::EXTRA_LARGE_BUFFER_SIZE);
+            trace!(
+                "Created new extra large buffer with capacity: {}",
+                buffer.capacity()
+            );
+            buffer
+        } else if capacity <= Self::MAX_BUFFER_SIZE {
+            if let Some(mut buffer) = self.max_buffers.pop() {
+                buffer.clear();
+                trace!("Reused max buffer with capacity: {}", 
buffer.capacity());
+                return buffer;
+            }
+            self.max_created.fetch_add(1, Ordering::Relaxed);
+            let buffer = BytesMut::with_capacity(Self::MAX_BUFFER_SIZE);
+            trace!(
+                "Created new max buffer with capacity: {}",
+                buffer.capacity()
+            );
+            buffer
+        } else {
+            // For very large buffers that exceed our max size, just allocate 
directly
+            debug!("Created oversized buffer with capacity: {} B", capacity);
+            BytesMut::with_capacity(capacity)
+        }
+    }
+
+    /// Return a buffer to the pool
+    fn return_buffer(&self, buffer: BytesMut) {
+        let capacity = buffer.capacity();
+        if capacity == Self::SMALL_BUFFER_SIZE {
+            if self.small_buffers.push(buffer).is_err() {
+                trace!("Small buffer pool full, dropping buffer");
+            } else {
+                self.small_returned.fetch_add(1, Ordering::Relaxed);
+                trace!("Returned small buffer to pool");
+            }
+        } else if capacity == Self::MEDIUM_BUFFER_SIZE {
+            if self.medium_buffers.push(buffer).is_err() {
+                trace!("Medium buffer pool full, dropping buffer");
+            } else {
+                self.medium_returned.fetch_add(1, Ordering::Relaxed);
+                trace!("Returned medium buffer to pool");
+            }
+        } else if capacity == Self::LARGE_BUFFER_SIZE {
+            if self.large_buffers.push(buffer).is_err() {
+                trace!("Large buffer pool full, dropping buffer");
+            } else {
+                self.large_returned.fetch_add(1, Ordering::Relaxed);
+                trace!("Returned large buffer to pool");
+            }
+        } else if capacity == Self::EXTRA_LARGE_BUFFER_SIZE {
+            if self.extra_large_buffers.push(buffer).is_err() {
+                trace!("Extra large buffer pool full, dropping buffer");
+            } else {
+                self.extra_large_returned.fetch_add(1, Ordering::Relaxed);
+                trace!("Returned extra large buffer to pool");
+            }
+        } else if capacity == Self::MAX_BUFFER_SIZE {
+            if self.max_buffers.push(buffer).is_err() {
+                trace!("Max buffer pool full, dropping buffer");
+            } else {
+                self.max_returned.fetch_add(1, Ordering::Relaxed);
+                trace!("Returned max buffer to pool");
+            }
+        } else if capacity != 0 {
+            trace!(
+                "Returned buffer to pool with unknown capacity: {}",
+                capacity
+            );
+        }
+    }
+
+    /// Log stats about buffer allocation and reuse
+    pub fn log_stats(&self) {
+        let sm_created = self.small_created.load(Ordering::Relaxed);
+        let md_created = self.medium_created.load(Ordering::Relaxed);
+        let lg_created = self.large_created.load(Ordering::Relaxed);
+        let xl_created = self.extra_large_created.load(Ordering::Relaxed);
+        let mx_created = self.max_created.load(Ordering::Relaxed);
+
+        let sm_returned = self.small_returned.load(Ordering::Relaxed);
+        let md_returned = self.medium_returned.load(Ordering::Relaxed);
+        let lg_returned = self.large_returned.load(Ordering::Relaxed);
+        let xl_returned = self.extra_large_returned.load(Ordering::Relaxed);
+        let mx_returned = self.max_returned.load(Ordering::Relaxed);
+
+        let sm_pool = self.small_buffers.len();
+        let md_pool = self.medium_buffers.len();
+        let lg_pool = self.large_buffers.len();
+        let xl_pool = self.extra_large_buffers.len();
+        let mx_pool = self.max_buffers.len();
+
+        let sm_size = IggyByteSize::from((sm_pool * Self::SMALL_BUFFER_SIZE) 
as u64);
+        let md_size = IggyByteSize::from((md_pool * Self::MEDIUM_BUFFER_SIZE) 
as u64);
+        let lg_size = IggyByteSize::from((lg_pool * Self::LARGE_BUFFER_SIZE) 
as u64);
+        let xl_size = IggyByteSize::from((xl_pool * 
Self::EXTRA_LARGE_BUFFER_SIZE) as u64);
+        let mx_size = IggyByteSize::from((mx_pool * Self::MAX_BUFFER_SIZE) as 
u64);
+
+        let total_size = IggyByteSize::from(
+            (sm_pool * Self::SMALL_BUFFER_SIZE
+                + md_pool * Self::MEDIUM_BUFFER_SIZE
+                + lg_pool * Self::LARGE_BUFFER_SIZE
+                + xl_pool * Self::EXTRA_LARGE_BUFFER_SIZE
+                + mx_pool * Self::MAX_BUFFER_SIZE) as u64,
+        );
+
+        let sm_reuse = if sm_created > 0 {
+            sm_returned as f64 / sm_created as f64 * 100.0
+        } else {
+            0.0
+        };
+        let md_reuse = if md_created > 0 {
+            md_returned as f64 / md_created as f64 * 100.0
+        } else {
+            0.0
+        };
+        let lg_reuse = if lg_created > 0 {
+            lg_returned as f64 / lg_created as f64 * 100.0
+        } else {
+            0.0
+        };
+        let xl_reuse = if xl_created > 0 {
+            xl_returned as f64 / xl_created as f64 * 100.0
+        } else {
+            0.0
+        };
+        let mx_reuse = if mx_created > 0 {
+            mx_returned as f64 / mx_created as f64 * 100.0
+        } else {
+            0.0
+        };
+
+        let sm_util = sm_pool as f64 / Self::SMALL_POOL_SIZE as f64 * 100.0;
+        let md_util = md_pool as f64 / Self::MEDIUM_POOL_SIZE as f64 * 100.0;
+        let lg_util = lg_pool as f64 / Self::LARGE_POOL_SIZE as f64 * 100.0;
+        let xl_util = xl_pool as f64 / Self::EXTRA_LARGE_POOL_SIZE as f64 * 
100.0;
+        let mx_util = mx_pool as f64 / Self::MAX_POOL_SIZE as f64 * 100.0;
+
+        info!("BytesPool: Small[{}/{}|{:.1}%|{:.1}%|{}] 
Medium[{}/{}|{:.1}%|{:.1}%|{}] Large[{}/{}|{:.1}%|{:.1}%|{}]",
+        sm_returned, sm_created, sm_reuse, sm_util, sm_size,
+        md_returned, md_created, md_reuse, md_util, md_size,
+        lg_returned, lg_created, lg_reuse, lg_util, lg_size);
+
+        info!(
+            "BytesPool: XLarge[{}/{}|{:.1}%|{:.1}%|{}] 
Max[{}/{}|{:.1}%|{:.1}%|{}] Total: {}",
+            xl_returned,
+            xl_created,
+            xl_reuse,
+            xl_util,
+            xl_size,
+            mx_returned,
+            mx_created,
+            mx_reuse,
+            mx_util,
+            mx_size,
+            total_size
+        );
+    }
+}
+
+impl Default for BytesMutPool {
+    fn default() -> Self {
+        Self {
+            small_buffers: Arc::new(ArrayQueue::new(Self::SMALL_POOL_SIZE)),
+            medium_buffers: Arc::new(ArrayQueue::new(Self::MEDIUM_POOL_SIZE)),
+            large_buffers: Arc::new(ArrayQueue::new(Self::LARGE_POOL_SIZE)),
+            extra_large_buffers: 
Arc::new(ArrayQueue::new(Self::EXTRA_LARGE_POOL_SIZE)),
+            max_buffers: Arc::new(ArrayQueue::new(Self::MAX_POOL_SIZE)),
+            small_created: Arc::new(AtomicUsize::new(0)),
+            medium_created: Arc::new(AtomicUsize::new(0)),
+            large_created: Arc::new(AtomicUsize::new(0)),
+            extra_large_created: Arc::new(AtomicUsize::new(0)),
+            max_created: Arc::new(AtomicUsize::new(0)),
+            small_returned: Arc::new(AtomicUsize::new(0)),
+            medium_returned: Arc::new(AtomicUsize::new(0)),
+            large_returned: Arc::new(AtomicUsize::new(0)),
+            extra_large_returned: Arc::new(AtomicUsize::new(0)),
+            max_returned: Arc::new(AtomicUsize::new(0)),
+        }
+    }
+}
+
+/// Extension trait for more ergonomic buffer return
+pub trait BytesMutExt {
+    fn return_to_pool(self);
+}
+
+impl BytesMutExt for BytesMut {
+    fn return_to_pool(self) {
+        BYTES_MUT_POOL.return_buffer(self);
+    }
+}
diff --git a/server/src/streaming/utils/mod.rs 
b/server/src/streaming/utils/mod.rs
index 33481414..25fc7d43 100644
--- a/server/src/streaming/utils/mod.rs
+++ b/server/src/streaming/utils/mod.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+pub mod bytes_mut_pool;
 pub mod crypto;
 pub mod file;
 pub mod hash;

Reply via email to