This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch zero-copy-no-batching
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 5a7cde2e3162f89978afc6f0892ab71bfc76151e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Mar 23 20:41:21 2025 +0100

    fixes in sdk
---
 sdk/src/cli/message/poll_messages.rs               | 105 ++++-----
 sdk/src/clients/consumer.rs                        | 235 ++++++++++----------
 sdk/src/consumer_ext/consumer_message_ext.rs       |   3 +-
 sdk/src/consumer_ext/mod.rs                        |   7 +-
 sdk/src/messages/send_messages.rs                  | 244 ++++++++-------------
 sdk/src/models/messaging/indexes.rs                |  88 +-------
 sdk/src/models/messaging/message.rs                |  37 +++-
 server/src/streaming/partitions/messages.rs        |  69 +++---
 server/src/streaming/partitions/storage.rs         |   5 +-
 .../streaming/segments/messages/messages_writer.rs |   5 -
 .../src/streaming/segments/messages_accumulator.rs |   2 -
 server/src/streaming/segments/reading_messages.rs  |  16 --
 server/src/streaming/segments/segment.rs           |  13 +-
 .../streaming/segments/types/message_view_mut.rs   |   6 +-
 .../streaming/segments/types/messages_batch_set.rs |  17 --
 server/src/streaming/segments/writing_messages.rs  |   3 +-
 server/src/streaming/topics/messages.rs            |   7 +-
 server/src/tcp/connection_handler.rs               |   6 -
 18 files changed, 348 insertions(+), 520 deletions(-)

diff --git a/sdk/src/cli/message/poll_messages.rs 
b/sdk/src/cli/message/poll_messages.rs
index 67196408..b153f4e8 100644
--- a/sdk/src/cli/message/poll_messages.rs
+++ b/sdk/src/cli/message/poll_messages.rs
@@ -5,8 +5,7 @@ use crate::consumer::Consumer;
 use crate::identifier::Identifier;
 use crate::messages::{PollMessages, PollingStrategy};
 use crate::models::messaging::HeaderKind;
-use crate::prelude::{HeaderKey, IggyMessage};
-use crate::utils::sizeable::Sizeable;
+use crate::prelude::{HeaderKey, HeaderValue, IggyMessage};
 use crate::utils::timestamp::IggyTimestamp;
 use crate::utils::{byte_size::IggyByteSize, duration::IggyDuration};
 use anyhow::Context;
@@ -63,22 +62,27 @@ impl PollMessagesCmd {
 
     fn create_message_header_keys(
         &self,
-        polled_messages: &Vec<IggyMessage>,
+        polled_messages: &[IggyMessage],
     ) -> HashSet<(HeaderKey, HeaderKind)> {
-        todo!();
-        // if !self.show_headers {
-        //     return HashSet::new();
-        // }
-        // polled_messages
-        //     .iter()
-        //     .flat_map(|m| match m.user_headers.as_ref() {
-        //         Some(h) => HashMap::from_bytes(h.clone())
-        //             .iter()
-        //             .map(|(k, v)| (k.clone(), v.kind))
-        //             .collect::<Vec<_>>(),
-        //         None => vec![],
-        //     })
-        //     .collect::<HashSet<_>>()
+        if !self.show_headers {
+            return HashSet::new();
+        }
+        polled_messages
+            .iter()
+            .flat_map(|m| match m.user_headers.as_ref() {
+                Some(h) => match HashMap::<HeaderKey, 
HeaderValue>::from_bytes(h.clone()) {
+                    Ok(headers) => headers
+                        .iter()
+                        .map(|(k, v)| (k.clone(), v.kind))
+                        .collect::<Vec<_>>(),
+                    Err(e) => {
+                        tracing::error!("Failed to parse user headers, error: 
{e}");
+                        vec![]
+                    }
+                },
+                None => vec![],
+            })
+            .collect::<HashSet<_>>()
     }
 
     fn create_table_header(header_key_set: &HashSet<(HeaderKey, HeaderKind)>) 
-> Row {
@@ -101,42 +105,41 @@ impl PollMessagesCmd {
     }
 
     fn create_table_content(
-        polled_messages: &Vec<IggyMessage>,
+        polled_messages: &[IggyMessage],
         message_header_keys: &HashSet<(HeaderKey, HeaderKind)>,
     ) -> Vec<Row> {
-        todo!();
-        // polled_messages
-        //     .iter()
-        //     .map(|message| {
-        //         let mut row = vec![
-        //             format!("{}", message.header.offset),
-        //             IggyTimestamp::from(message.header.timestamp)
-        //                 .to_local_string("%Y-%m-%d %H:%M:%S%.6f"),
-        //             format!("{}", message.header.id),
-        //             format!("{}", message.payload.len()),
-        //             String::from_utf8_lossy(&message.payload).to_string(),
-        //         ];
-
-        //         let values = message_header_keys
-        //             .iter()
-        //             .map(|(key, kind)| {
-        //                 let user_headers = 
HashMap::from_bytes(message.user_headers);
-        //                 message
-        //                     .user_headers
-        //                     .as_ref()
-        //                     .map(|h| {
-        //                         h.get(key)
-        //                             .filter(|v| v.kind == *kind)
-        //                             .map(|v| v.value_only_to_string())
-        //                             .unwrap_or_default()
-        //                     })
-        //                     .unwrap_or_default()
-        //             })
-        //             .collect::<Vec<_>>();
-        //         row.extend(values);
-        //         Row::from(row)
-        //     })
-        //     .collect::<_>()
+        polled_messages
+            .iter()
+            .map(|message| {
+                let mut row = vec![
+                    format!("{}", message.header.offset),
+                    IggyTimestamp::from(message.header.timestamp)
+                        .to_local_string("%Y-%m-%d %H:%M:%S%.6f"),
+                    format!("{}", message.header.id),
+                    format!("{}", message.payload.len()),
+                    String::from_utf8_lossy(&message.payload).to_string(),
+                ];
+
+                let values = message_header_keys
+                    .iter()
+                    .map(|(key, kind)| {
+                        message
+                            .user_headers_map()
+                            .expect("Failed to parse user headers")
+                            .as_ref()
+                            .map(|h| {
+                                h.get(key)
+                                    .filter(|v| v.kind == *kind)
+                                    .map(|v| v.value_only_to_string())
+                                    .unwrap_or_default()
+                            })
+                            .unwrap_or_default()
+                    })
+                    .collect::<Vec<_>>();
+                row.extend(values);
+                Row::from(row)
+            })
+            .collect::<_>()
     }
 }
 
diff --git a/sdk/src/clients/consumer.rs b/sdk/src/clients/consumer.rs
index 2e50e86e..9a621030 100644
--- a/sdk/src/clients/consumer.rs
+++ b/sdk/src/clients/consumer.rs
@@ -5,8 +5,7 @@ use crate::error::IggyError;
 use crate::identifier::{IdKind, Identifier};
 use crate::locking::{IggySharedMut, IggySharedMutFn};
 use crate::messages::PollingStrategy;
-use crate::prelude::{IggyMessage, PolledMessages};
-use crate::utils::byte_size::IggyByteSize;
+use crate::prelude::{IggyMessage, PolledMessages, PollingKind};
 use crate::utils::crypto::EncryptorKind;
 use crate::utils::duration::IggyDuration;
 use crate::utils::timestamp::IggyTimestamp;
@@ -25,10 +24,8 @@ use tokio::time;
 use tokio::time::sleep;
 use tracing::{error, info, trace, warn};
 
-//const EMPTY_MESSAGES: Vec<PolledMessage> = Vec::new();
-
 const ORDERING: std::sync::atomic::Ordering = 
std::sync::atomic::Ordering::SeqCst;
-type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<Vec<IggyMessage>, 
IggyError>>>>;
+type PollMessagesFuture = Pin<Box<dyn Future<Output = Result<PolledMessages, 
IggyError>>>>;
 
 /// The auto-commit configuration for storing the offset on the server.
 #[derive(Debug, PartialEq, Copy, Clone)]
@@ -102,7 +99,7 @@ pub struct IggyConsumer {
     last_consumed_offsets: Arc<DashMap<u32, AtomicU64>>,
     current_offsets: Arc<DashMap<u32, AtomicU64>>,
     poll_future: Option<PollMessagesFuture>,
-    buffered_messages: VecDeque<Vec<IggyMessage>>,
+    buffered_messages: VecDeque<IggyMessage>,
     encryptor: Option<Arc<EncryptorKind>>,
     store_offset_sender: flume::Sender<(u32, u64)>,
     store_offset_after_each_message: bool,
@@ -611,10 +608,7 @@ impl IggyConsumer {
                     auto_commit_after_polling,
                 )
                 .await;
-            polled_messages
 
-            //TODO: Fix me
-            /*
             if let Ok(mut polled_messages) = polled_messages {
                 if polled_messages.messages.is_empty() {
                     return Ok(polled_messages);
@@ -635,13 +629,9 @@ impl IggyConsumer {
                 if !allow_replay && has_consumed_offset {
                     polled_messages
                         .messages
-                        .retain(|message| message.offset > consumed_offset);
+                        .retain(|message| message.header.offset > 
consumed_offset);
                     if polled_messages.messages.is_empty() {
-                        return Ok(PolledMessages {
-                            messages: EMPTY_MESSAGES,
-                            current_offset: polled_messages.current_offset,
-                            partition_id,
-                        });
+                        return Ok(PolledMessages::empty());
                     }
                 }
 
@@ -693,9 +683,10 @@ impl IggyConsumer {
                     }
 
                     return Ok(PolledMessages {
-                        messages: EMPTY_MESSAGES,
+                        messages: vec![],
                         current_offset: polled_messages.current_offset,
                         partition_id,
+                        count: 0,
                     });
                 }
 
@@ -712,7 +703,6 @@ impl IggyConsumer {
                 sleep(retry_interval.get_duration()).await;
             }
             Err(error)
-            */
         }
     }
 
@@ -791,16 +781,16 @@ impl IggyConsumer {
     }
 }
 
-pub struct ReceivedBatch {
-    pub messages: Vec<IggyMessage>,
+pub struct ReceivedMessage {
+    pub message: IggyMessage,
     pub current_offset: u64,
     pub partition_id: u32,
 }
 
-impl ReceivedBatch {
-    pub fn new(messages: Vec<IggyMessage>, current_offset: u64, partition_id: 
u32) -> Self {
+impl ReceivedMessage {
+    pub fn new(message: IggyMessage, current_offset: u64, partition_id: u32) 
-> Self {
         Self {
-            messages,
+            message,
             current_offset,
             partition_id,
         }
@@ -808,134 +798,135 @@ impl ReceivedBatch {
 }
 
 impl Stream for IggyConsumer {
-    type Item = Result<ReceivedBatch, IggyError>;
+    type Item = Result<ReceivedMessage, IggyError>;
 
     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
         let partition_id = self.current_partition_id.load(ORDERING);
         if let Some(message) = self.buffered_messages.pop_front() {
-            //TODO: Fix me
-            /*
+            {
+                if let Some(last_consumed_offset_entry) =
+                    self.last_consumed_offsets.get(&partition_id)
                 {
-                    if let Some(last_consumed_offset_entry) =
-                        self.last_consumed_offsets.get(&partition_id)
-                    {
-                        last_consumed_offset_entry.store(message.offset, 
ORDERING);
-                    } else {
-                        self.last_consumed_offsets
-                            .insert(partition_id, 
AtomicU64::new(message.offset));
-                    }
-
-                    if (self.store_after_every_nth_message > 0
-                        && message.offset % self.store_after_every_nth_message 
== 0)
-                        || self.store_offset_after_each_message
-                    {
-                        self.send_store_offset(partition_id, message.offset);
-                    }
+                    last_consumed_offset_entry.store(message.header.offset, 
ORDERING);
+                } else {
+                    self.last_consumed_offsets
+                        .insert(partition_id, 
AtomicU64::new(message.header.offset));
                 }
 
-                if self.buffered_messages.is_empty() {
-                    if self.polling_strategy.kind == PollingKind::Offset {
-                        self.polling_strategy = 
PollingStrategy::offset(message.offset + 1);
-                    }
-
-                    if self.store_offset_after_all_messages {
-                        self.send_store_offset(partition_id, message.offset);
-                    }
+                if (self.store_after_every_nth_message > 0
+                    && message.header.offset % 
self.store_after_every_nth_message == 0)
+                    || self.store_offset_after_each_message
+                {
+                    self.send_store_offset(partition_id, 
message.header.offset);
                 }
+            }
 
-                let current_offset;
-                if let Some(current_offset_entry) = 
self.current_offsets.get(&partition_id) {
-                    current_offset = current_offset_entry.load(ORDERING);
-                } else {
-                    current_offset = 0;
+            if self.buffered_messages.is_empty() {
+                if self.polling_strategy.kind == PollingKind::Offset {
+                    self.polling_strategy = 
PollingStrategy::offset(message.header.offset + 1);
                 }
 
-                return Poll::Ready(Some(Ok(ReceivedMessage::new(
-                    message,
-                    current_offset,
-                    partition_id,
-                ))));
+                if self.store_offset_after_all_messages {
+                    self.send_store_offset(partition_id, 
message.header.offset);
+                }
             }
 
-            if self.poll_future.is_none() {
-                let future = self.create_poll_messages_future();
-                self.poll_future = Some(Box::pin(future));
+            let current_offset;
+            if let Some(current_offset_entry) = 
self.current_offsets.get(&partition_id) {
+                current_offset = current_offset_entry.load(ORDERING);
+            } else {
+                current_offset = 0;
             }
 
-            while let Some(future) = self.poll_future.as_mut() {
-                match future.poll_unpin(cx) {
-                    Poll::Ready(Ok(mut polled_messages)) => {
-                        let partition_id = polled_messages.partition_id;
-                        self.current_partition_id.store(partition_id, 
ORDERING);
-                        if polled_messages.messages.is_empty() {
-                            self.poll_future = 
Some(Box::pin(self.create_poll_messages_future()));
-                        } else {
-                            if let Some(ref encryptor) = self.encryptor {
-                                for message in &mut polled_messages.messages {
-                                    let payload = 
encryptor.decrypt(&message.payload);
-                                    if payload.is_err() {
-                                        self.poll_future = None;
-                                        error!("Failed to decrypt the message 
payload at offset: {}, partition ID: {}", message.offset, partition_id);
-                                        let error = payload.unwrap_err();
-                                        return Poll::Ready(Some(Err(error)));
-                                    }
-
-                                    let payload = payload.unwrap();
-                                    message.payload = Bytes::from(payload);
-                                    message.length = 
IggyByteSize::from(message.payload.len() as u64);
+            return Poll::Ready(Some(Ok(ReceivedMessage::new(
+                message,
+                current_offset,
+                partition_id,
+            ))));
+        }
+
+        if self.poll_future.is_none() {
+            let future = self.create_poll_messages_future();
+            self.poll_future = Some(Box::pin(future));
+        }
+
+        while let Some(future) = self.poll_future.as_mut() {
+            match future.poll_unpin(cx) {
+                Poll::Ready(Ok(mut polled_messages)) => {
+                    let partition_id = polled_messages.partition_id;
+                    self.current_partition_id.store(partition_id, ORDERING);
+                    if polled_messages.messages.is_empty() {
+                        self.poll_future = 
Some(Box::pin(self.create_poll_messages_future()));
+                    } else {
+                        if let Some(ref encryptor) = self.encryptor {
+                            for message in &mut polled_messages.messages {
+                                let payload = 
encryptor.decrypt(&message.payload);
+                                if payload.is_err() {
+                                    self.poll_future = None;
+                                    error!("Failed to decrypt the message 
payload at offset: {}, partition ID: {}", message.header.offset, partition_id);
+                                    let error = payload.unwrap_err();
+                                    return Poll::Ready(Some(Err(error)));
                                 }
-                            }
 
-                            if let Some(current_offset_entry) = 
self.current_offsets.get(&partition_id)
-                            {
-                                
current_offset_entry.store(polled_messages.current_offset, ORDERING);
-                            } else {
-                                self.current_offsets.insert(
-                                    partition_id,
-                                    
AtomicU64::new(polled_messages.current_offset),
-                                );
+                                let payload = payload.unwrap();
+                                message.payload = Bytes::from(payload);
+                                message.header.payload_length = 
message.payload.len() as u32;
                             }
+                        }
 
-                            let message = polled_messages.messages.remove(0);
-                            
self.buffered_messages.extend(polled_messages.messages);
+                        if let Some(current_offset_entry) = 
self.current_offsets.get(&partition_id)
+                        {
+                            
current_offset_entry.store(polled_messages.current_offset, ORDERING);
+                        } else {
+                            self.current_offsets.insert(
+                                partition_id,
+                                AtomicU64::new(polled_messages.current_offset),
+                            );
+                        }
 
-                            if self.polling_strategy.kind == 
PollingKind::Offset {
-                                self.polling_strategy = 
PollingStrategy::offset(message.offset + 1);
-                            }
+                        let message = polled_messages.messages.remove(0);
+                        
self.buffered_messages.extend(polled_messages.messages);
 
-                            if let Some(last_consumed_offset_entry) =
-                                self.last_consumed_offsets.get(&partition_id)
-                            {
-                                
last_consumed_offset_entry.store(message.offset, ORDERING);
-                            } else {
-                                self.last_consumed_offsets
-                                    .insert(partition_id, 
AtomicU64::new(message.offset));
-                            }
+                        if self.polling_strategy.kind == PollingKind::Offset {
+                            self.polling_strategy =
+                                PollingStrategy::offset(message.header.offset 
+ 1);
+                        }
 
-                            if (self.store_after_every_nth_message > 0
-                                && message.offset % 
self.store_after_every_nth_message == 0)
-                                || self.store_offset_after_each_message
-                                || (self.store_offset_after_all_messages
-                                    && self.buffered_messages.is_empty())
-                            {
-                                
self.send_store_offset(polled_messages.partition_id, message.offset);
-                            }
+                        if let Some(last_consumed_offset_entry) =
+                            self.last_consumed_offsets.get(&partition_id)
+                        {
+                            
last_consumed_offset_entry.store(message.header.offset, ORDERING);
+                        } else {
+                            self.last_consumed_offsets
+                                .insert(partition_id, 
AtomicU64::new(message.header.offset));
+                        }
 
-                            self.poll_future = None;
-                            return Poll::Ready(Some(Ok(ReceivedMessage::new(
-                                message,
-                                polled_messages.current_offset,
+                        if (self.store_after_every_nth_message > 0
+                            && message.header.offset % 
self.store_after_every_nth_message == 0)
+                            || self.store_offset_after_each_message
+                            || (self.store_offset_after_all_messages
+                                && self.buffered_messages.is_empty())
+                        {
+                            self.send_store_offset(
                                 polled_messages.partition_id,
-                            ))));
+                                message.header.offset,
+                            );
                         }
-                    }
-                    Poll::Ready(Err(err)) => {
+
                         self.poll_future = None;
-                        return Poll::Ready(Some(Err(err)));
+                        return Poll::Ready(Some(Ok(ReceivedMessage::new(
+                            message,
+                            polled_messages.current_offset,
+                            polled_messages.partition_id,
+                        ))));
                     }
-                    Poll::Pending => return Poll::Pending,
                 }
-            */
+                Poll::Ready(Err(err)) => {
+                    self.poll_future = None;
+                    return Poll::Ready(Some(Err(err)));
+                }
+                Poll::Pending => return Poll::Pending,
+            }
         }
 
         Poll::Pending
diff --git a/sdk/src/consumer_ext/consumer_message_ext.rs 
b/sdk/src/consumer_ext/consumer_message_ext.rs
index 6c093fa3..cab9e540 100644
--- a/sdk/src/consumer_ext/consumer_message_ext.rs
+++ b/sdk/src/consumer_ext/consumer_message_ext.rs
@@ -70,8 +70,7 @@ impl IggyConsumerMessageExt for IggyConsumer {
                         Some(Ok(received_message)) => {
                             let partition_id = received_message.partition_id;
                             let current_offset = 
received_message.current_offset;
-                            //TODO(hubcio): fix me, this used to be 
message.offset
-                            let message_offset = 
received_message.messages[0].header.offset;
+                            let message_offset = 
received_message.message.header.offset;
                             if let Err(err) = 
message_consumer.consume(received_message).await {
                                 error!("Error while handling message at 
offset: {message_offset}/{current_offset}, partition: {partition_id} for 
consumer: {name} on topic: {topic} and stream: {stream} due to error: {err}",
                                     name = self.name(), topic = self.topic(), 
stream = self.stream());
diff --git a/sdk/src/consumer_ext/mod.rs b/sdk/src/consumer_ext/mod.rs
index 52869755..ca6a9548 100644
--- a/sdk/src/consumer_ext/mod.rs
+++ b/sdk/src/consumer_ext/mod.rs
@@ -1,8 +1,7 @@
 mod consumer_message_ext;
 mod consumer_message_trait;
 
-use crate::clients::consumer::ReceivedBatch;
-use crate::error::IggyError;
+use crate::{clients::consumer::ReceivedMessage, error::IggyError};
 pub use consumer_message_trait::IggyConsumerMessageExt;
 
 /// Trait for message consumer
@@ -18,7 +17,7 @@ pub trait LocalMessageConsumer {
     /// # Errors
     ///
     /// * `IggyError` - If the message consumer fails to consume the message
-    async fn consume(&self, message: ReceivedBatch) -> Result<(), IggyError>;
+    async fn consume(&self, message: ReceivedMessage) -> Result<(), IggyError>;
 }
 
 // Default implementation for `&T`
@@ -33,7 +32,7 @@ impl<T: MessageConsumer + Send + Sync> MessageConsumer for &T 
{
     /// # Errors
     ///
     /// * `IggyError` - If the message consumer fails to consume the message
-    async fn consume(&self, message: ReceivedBatch) -> Result<(), IggyError> {
+    async fn consume(&self, message: ReceivedMessage) -> Result<(), IggyError> 
{
         (**self).consume(message).await
     }
 }
diff --git a/sdk/src/messages/send_messages.rs 
b/sdk/src/messages/send_messages.rs
index 2ab0ad0e..6c4b92da 100644
--- a/sdk/src/messages/send_messages.rs
+++ b/sdk/src/messages/send_messages.rs
@@ -169,75 +169,10 @@ impl Validatable<IggyError> for SendMessages {
 impl BytesSerializable for SendMessages {
     fn to_bytes(&self) -> Bytes {
         panic!("should not be used")
-
-        // let stream_id_bytes = self.stream_id.to_bytes();
-        // let topic_id_bytes = self.topic_id.to_bytes();
-        // let partitioning_bytes = self.partitioning.to_bytes();
-
-        // let metadata_len = stream_id_bytes.len()
-        //     + topic_id_bytes.len()
-        //     + partitioning_bytes.len()
-        //     + std::mem::size_of::<u32>();
-
-        // let total_len = metadata_len + self.messages.len();
-
-        // let mut bytes = BytesMut::with_capacity(total_len);
-
-        // bytes.put_slice(&stream_id_bytes);
-        // bytes.put_slice(&topic_id_bytes);
-        // bytes.put_slice(&partitioning_bytes);
-        // bytes.put_u32_le(self.messages_count);
-        // bytes.put_slice(&self.messages);
-
-        // bytes.freeze()
     }
 
     fn from_bytes(bytes: Bytes) -> Result<SendMessages, IggyError> {
         panic!("should not be used")
-        // if bytes.is_empty() {
-        //     return Err(IggyError::InvalidCommand);
-        // }
-
-        // let mut position = 0;
-        // let stream_id = Identifier::from_bytes(bytes.clone())?;
-        // position += stream_id.get_size_bytes().as_bytes_usize();
-
-        // if bytes.len() <= position {
-        //     return Err(IggyError::InvalidCommand);
-        // }
-
-        // let topic_id = Identifier::from_bytes(bytes.slice(position..))?;
-        // position += topic_id.get_size_bytes().as_bytes_usize();
-
-        // if bytes.len() <= position {
-        //     return Err(IggyError::InvalidCommand);
-        // }
-
-        // let partitioning = 
Partitioning::from_bytes(bytes.slice(position..))?;
-        // position += partitioning.get_size_bytes().as_bytes_usize();
-
-        // if bytes.len() < position + 4 {
-        //     return Err(IggyError::InvalidCommand);
-        // }
-
-        // let messages_count = u32::from_le_bytes(
-        //     bytes
-        //         .slice(position..position + 4)
-        //         .as_ref()
-        //         .try_into()
-        //         .unwrap(),
-        // );
-        // position += 4;
-
-        // let messages = bytes.slice(position..);
-
-        // Ok(SendMessages {
-        //     stream_id,
-        //     topic_id,
-        //     partitioning,
-        //     messages_count,
-        //     messages,
-        // })
     }
 }
 
@@ -255,97 +190,98 @@ impl Display for SendMessages {
 
 #[cfg(test)]
 mod tests {
+    use std::str::FromStr;
+
     use super::*;
 
-    //     //TODO: Fix me, fix those tests.
-    //     #[test]
-    //     fn should_be_serialized_as_bytes() {
-    //         let message_1 = Message::from_str("hello 1").unwrap();
-    //         let message_2 = Message::new(Some(2), "hello 2".into(), None);
-    //         let message_3 = Message::new(Some(3), "hello 3".into(), None);
-    //         let messages = vec![message_1, message_2, message_3];
-    //         let command = SendMessages {
-    //             stream_id: Identifier::numeric(1).unwrap(),
-    //             topic_id: Identifier::numeric(2).unwrap(),
-    //             partitioning: Partitioning::partition_id(4),
-    //             messages,
-    //         };
-
-    //         let bytes = command.to_bytes();
-
-    //         let mut position = 0;
-    //         let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
-    //         position += stream_id.get_size_bytes().as_bytes_usize();
-    //         let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
-    //         position += topic_id.get_size_bytes().as_bytes_usize();
-    //         let key = 
Partitioning::from_bytes(bytes.slice(position..)).unwrap();
-    //         position += key.get_size_bytes().as_bytes_usize();
-    //         let messages = bytes.slice(position..);
-    //         let command_messages = command
-    //             .messages
-    //             .iter()
-    //             .fold(BytesMut::new(), |mut bytes_mut, message| {
-    //                 bytes_mut.put(message.to_bytes());
-    //                 bytes_mut
-    //             })
-    //             .freeze();
-
-    //         assert!(!bytes.is_empty());
-    //         assert_eq!(stream_id, command.stream_id);
-    //         assert_eq!(topic_id, command.topic_id);
-    //         assert_eq!(key, command.partitioning);
-    //         assert_eq!(messages, command_messages);
-    //     }
-
-    //     #[test]
-    //     fn should_be_deserialized_from_bytes() {
-    //         let stream_id = Identifier::numeric(1).unwrap();
-    //         let topic_id = Identifier::numeric(2).unwrap();
-    //         let key = Partitioning::partition_id(4);
-
-    //         let message_1 = Message::from_str("hello 1").unwrap();
-    //         let message_2 = Message::new(Some(2), "hello 2".into(), None);
-    //         let message_3 = Message::new(Some(3), "hello 3".into(), None);
-    //         let messages = [
-    //             message_1.to_bytes(),
-    //             message_2.to_bytes(),
-    //             message_3.to_bytes(),
-    //         ]
-    //         .concat();
-
-    //         let key_bytes = key.to_bytes();
-    //         let stream_id_bytes = stream_id.to_bytes();
-    //         let topic_id_bytes = topic_id.to_bytes();
-    //         let current_position = stream_id_bytes.len() + 
topic_id_bytes.len() + key_bytes.len();
-    //         let mut bytes = BytesMut::with_capacity(current_position);
-    //         bytes.put_slice(&stream_id_bytes);
-    //         bytes.put_slice(&topic_id_bytes);
-    //         bytes.put_slice(&key_bytes);
-    //         bytes.put_slice(&messages);
-    //         let bytes = bytes.freeze();
-    //         let command = SendMessages::from_bytes(bytes.clone());
-    //         assert!(command.is_ok());
-
-    //         let messages_payloads = bytes.slice(current_position..);
-    //         let mut position = 0;
-    //         let mut messages = Vec::new();
-    //         while position < messages_payloads.len() {
-    //             let message = 
Message::from_bytes(messages_payloads.slice(position..)).unwrap();
-    //             position += message.get_size_bytes().as_bytes_usize();
-    //             messages.push(message);
-    //         }
-
-    //         let command = command.unwrap();
-    //         assert_eq!(command.stream_id, stream_id);
-    //         assert_eq!(command.topic_id, topic_id);
-    //         assert_eq!(command.partitioning, key);
-    //         for (index, message) in command.messages.iter().enumerate() {
-    //             let command_message = &command.messages[index];
-    //             assert_eq!(command_message.id, message.id);
-    //             assert_eq!(command_message.length, message.length);
-    //             assert_eq!(command_message.payload, message.payload);
-    //         }
-    //     }
+    #[test]
+    fn should_be_serialized_as_bytes() {
+        let message_1 = IggyMessage::from_str("hello 1").unwrap();
+        let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None);
+        let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None);
+        let messages = vec![message_1, message_2, message_3];
+        let command = SendMessages {
+            stream_id: Identifier::numeric(1).unwrap(),
+            topic_id: Identifier::numeric(2).unwrap(),
+            partitioning: Partitioning::partition_id(4),
+            messages,
+        };
+
+        let bytes = command.to_bytes();
+
+        let mut position = 0;
+        let stream_id = Identifier::from_bytes(bytes.clone()).unwrap();
+        position += stream_id.get_size_bytes().as_bytes_usize();
+        let topic_id = 
Identifier::from_bytes(bytes.slice(position..)).unwrap();
+        position += topic_id.get_size_bytes().as_bytes_usize();
+        let key = Partitioning::from_bytes(bytes.slice(position..)).unwrap();
+        position += key.get_size_bytes().as_bytes_usize();
+        let messages = bytes.slice(position..);
+        let command_messages = command
+            .messages
+            .iter()
+            .fold(BytesMut::new(), |mut bytes_mut, message| {
+                bytes_mut.put(message.to_bytes());
+                bytes_mut
+            })
+            .freeze();
+
+        assert!(!bytes.is_empty());
+        assert_eq!(stream_id, command.stream_id);
+        assert_eq!(topic_id, command.topic_id);
+        assert_eq!(key, command.partitioning);
+        assert_eq!(messages, command_messages);
+    }
+
+    #[test]
+    fn should_be_deserialized_from_bytes() {
+        let stream_id = Identifier::numeric(1).unwrap();
+        let topic_id = Identifier::numeric(2).unwrap();
+        let key = Partitioning::partition_id(4);
+
+        let message_1 = IggyMessage::from_str("hello 1").unwrap();
+        let message_2 = IggyMessage::new(Some(2), "hello 2".into(), None);
+        let message_3 = IggyMessage::new(Some(3), "hello 3".into(), None);
+        let messages = [
+            message_1.to_bytes(),
+            message_2.to_bytes(),
+            message_3.to_bytes(),
+        ]
+        .concat();
+
+        let key_bytes = key.to_bytes();
+        let stream_id_bytes = stream_id.to_bytes();
+        let topic_id_bytes = topic_id.to_bytes();
+        let current_position = stream_id_bytes.len() + topic_id_bytes.len() + 
key_bytes.len();
+        let mut bytes = BytesMut::with_capacity(current_position);
+        bytes.put_slice(&stream_id_bytes);
+        bytes.put_slice(&topic_id_bytes);
+        bytes.put_slice(&key_bytes);
+        bytes.put_slice(&messages);
+        let bytes = bytes.freeze();
+        let command = SendMessages::from_bytes(bytes.clone());
+        assert!(command.is_ok());
+
+        let messages_payloads = bytes.slice(current_position..);
+        let mut position = 0;
+        let mut messages = Vec::new();
+        while position < messages_payloads.len() {
+            let message = 
IggyMessage::from_bytes(messages_payloads.slice(position..)).unwrap();
+            position += message.get_size_bytes().as_bytes_usize();
+            messages.push(message);
+        }
+
+        let command = command.unwrap();
+        assert_eq!(command.stream_id, stream_id);
+        assert_eq!(command.topic_id, topic_id);
+        assert_eq!(command.partitioning, key);
+        for (index, message) in command.messages.iter().enumerate() {
+            let command_message = &command.messages[index];
+            assert_eq!(command_message.id, message.id);
+            assert_eq!(command_message.length, message.length);
+            assert_eq!(command_message.payload, message.payload);
+        }
+    }
 
     #[test]
     fn key_of_type_balanced_should_have_empty_value() {
diff --git a/sdk/src/models/messaging/indexes.rs 
b/sdk/src/models/messaging/indexes.rs
index 420c1b78..56c63055 100644
--- a/sdk/src/models/messaging/indexes.rs
+++ b/sdk/src/models/messaging/indexes.rs
@@ -1,12 +1,11 @@
 use super::{index_view::IggyIndexView, INDEX_SIZE};
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
-use std::fmt;
 use std::ops::{Deref, Index as StdIndex};
 
 /// A container for binary-encoded index data.
 /// Optimized for efficient storage and I/O operations.
-#[derive(Default, Clone, Serialize, Deserialize, PartialEq)]
+#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
 pub struct IggyIndexes {
     base_position: u32,
     buffer: Bytes,
@@ -89,26 +88,6 @@ impl IggyIndexes {
         Some(IggyIndexes::new(slice, base_position))
     }
 
-    /// Gets a first index
-    pub fn first(&self) -> Option<IggyIndexView> {
-        if self.count() == 0 {
-            return None;
-        }
-
-        Some(IggyIndexView::new(&self.buffer[0..INDEX_SIZE]))
-    }
-
-    /// Gets a last index
-    pub fn last(&self) -> Option<IggyIndexView> {
-        if self.count() == 0 {
-            return None;
-        }
-
-        Some(IggyIndexView::new(
-            &self.buffer[(self.count() - 1) as usize * INDEX_SIZE..],
-        ))
-    }
-
     /// Finds an index by timestamp using binary search
     /// If an exact match isn't found, returns the index with the nearest 
timestamp
     /// that is greater than or equal to the requested timestamp
@@ -162,43 +141,12 @@ impl IggyIndexes {
         self.base_position
     }
 
-    /// Helper method to get the first index offset
-    pub fn first_offset(&self) -> u32 {
-        let offset = self.get(0).map(|idx| idx.offset()).unwrap_or(0);
-        offset
-    }
-
-    /// Helper method to get the first index position
-    pub fn first_position(&self) -> u32 {
-        let position = self.get(0).map(|idx| idx.position()).unwrap_or(0);
-        position
-    }
-
-    /// Helper method to get the first timestamp
-    pub fn first_timestamp(&self) -> u64 {
-        self.get(0).map(|idx| idx.timestamp()).unwrap_or(0)
-    }
-
-    /// Helper method to get the last index offset
-    pub fn last_offset(&self) -> u32 {
-        self.get(self.count() - 1)
-            .map(|idx| idx.offset())
-            .unwrap_or(0)
-    }
-
     /// Helper method to get the last index position
     pub fn last_position(&self) -> u32 {
         self.get(self.count() - 1)
             .map(|idx| idx.position())
             .unwrap_or(0)
     }
-
-    /// Helper method to get the last timestamp
-    pub fn last_timestamp(&self) -> u64 {
-        self.get(self.count() - 1)
-            .map(|idx| idx.timestamp())
-            .unwrap_or(0)
-    }
 }
 
 impl StdIndex<usize> for IggyIndexes {
@@ -218,37 +166,3 @@ impl Deref for IggyIndexes {
         &self.buffer
     }
 }
-
-impl fmt::Debug for IggyIndexes {
-    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-        let count = self.count();
-
-        if count == 0 {
-            return write!(
-                f,
-                "IggyIndexes {{ count: 0, base_position: {}, indexes: [] }}",
-                self.base_position
-            );
-        }
-
-        writeln!(f, "IggyIndexes {{")?;
-        writeln!(f, "    count: {},", count)?;
-        writeln!(f, "    base_position: {},", self.base_position)?;
-        writeln!(f, "    indexes: [")?;
-
-        for i in 0..count {
-            if let Some(index) = self.get(i) {
-                writeln!(
-                    f,
-                    "        {{ offset: {}, position: {}, timestamp: {} }},",
-                    index.offset(),
-                    index.position(),
-                    index.timestamp()
-                )?;
-            }
-        }
-
-        writeln!(f, "    ]")?;
-        write!(f, "}}")
-    }
-}
diff --git a/sdk/src/models/messaging/message.rs 
b/sdk/src/models/messaging/message.rs
index c18d9c6e..ce12c14e 100644
--- a/sdk/src/models/messaging/message.rs
+++ b/sdk/src/models/messaging/message.rs
@@ -37,11 +37,46 @@ impl IggyMessage {
         Self::builder().with_id(id).with_payload(payload).build()
     }
 
+    /// Create a message with ID, payload and user headers
+    pub fn with_id_and_headers(
+        id: u128,
+        payload: Bytes,
+        headers: HashMap<HeaderKey, HeaderValue>,
+    ) -> Self {
+        Self::builder()
+            .with_id(id)
+            .with_payload(payload)
+            .with_user_headers_map(headers)
+            .build()
+    }
+
     /// Start a builder for more complex configuration
     pub fn builder() -> IggyMessageBuilder {
         IggyMessageBuilder::new()
     }
 
+    /// Return instantiated user headers map
+    pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, 
HeaderValue>>, IggyError> {
+        if let Some(headers) = &self.user_headers {
+            let headers_bytes = Bytes::copy_from_slice(headers);
+
+            match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) 
{
+                Ok(h) => Ok(Some(h)),
+                Err(e) => {
+                    tracing::error!(
+                        "Error parsing headers: {}, header_length={}",
+                        e,
+                        self.header.user_headers_length
+                    );
+
+                    Ok(None)
+                }
+            }
+        } else {
+            Ok(None)
+        }
+    }
+
     /// Convert Bytes to messages
     pub(crate) fn from_raw_bytes(buffer: Bytes, count: u32) -> 
Result<Vec<IggyMessage>, IggyError> {
         let mut messages = Vec::with_capacity(count as usize);
@@ -209,7 +244,7 @@ impl IggyMessageBuilder {
         self
     }
 
-    pub fn with_user_header_kv(mut self, key: HeaderKey, value: HeaderValue) 
-> Self {
+    pub fn with_user_key_value_header(mut self, key: HeaderKey, value: 
HeaderValue) -> Self {
         let headers = self.headers.get_or_insert_with(HashMap::new);
         headers.insert(key, value);
         self
diff --git a/server/src/streaming/partitions/messages.rs 
b/server/src/streaming/partitions/messages.rs
index 4dc3ef4c..f1c89d04 100644
--- a/server/src/streaming/partitions/messages.rs
+++ b/server/src/streaming/partitions/messages.rs
@@ -339,67 +339,54 @@ impl Partition {
 
 #[cfg(test)]
 mod tests {
-    // TODO: Fix me
-    /*
-    use iggy::utils::byte_size::IggyByteSize;
-    use iggy::utils::expiry::IggyExpiry;
-    use iggy::utils::sizeable::Sizeable;
-    use std::sync::atomic::{AtomicU32, AtomicU64};
-    use tempfile::TempDir;
-
     use super::*;
     use crate::configs::system::{MessageDeduplicationConfig, SystemConfig};
-    use crate::streaming::partitions::create_messages;
     use crate::streaming::persistence::persister::{FileWithSyncPersister, 
PersisterKind};
     use crate::streaming::storage::SystemStorage;
+    use bytes::Bytes;
+    use std::sync::atomic::{AtomicU32, AtomicU64};
+    use std::sync::Arc;
+    use tempfile::TempDir;
 
     #[tokio::test]
     async fn 
given_disabled_message_deduplication_all_messages_should_be_appended() {
         let (mut partition, _tempdir) = create_partition(false).await;
         let messages = create_messages();
         let messages_count = messages.len() as u32;
-        let appendable_batch_info = AppendableBatchInfo {
-            batch_size: messages
-                .iter()
-                .map(|m| m.get_size_bytes())
-                .sum::<IggyByteSize>(),
-            partition_id: partition.partition_id,
-        };
-        partition
-            .append_messages(appendable_batch_info, messages, None)
-            .await
-            .unwrap();
+        let messages_size = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_u32())
+            .sum();
+        let batch = IggyMessagesBatchMut::from_messages(&messages, 
messages_size);
+
+        partition.append_messages(batch, None).await.unwrap();
 
         let loaded_messages = partition
             .get_messages_by_offset(0, messages_count)
             .await
             .unwrap();
-        assert_eq!(loaded_messages.len(), messages_count as usize);
+        assert_eq!(loaded_messages.count(), messages_count);
     }
 
     #[tokio::test]
     async fn 
given_enabled_message_deduplication_only_messages_with_unique_id_should_be_appended()
 {
         let (mut partition, _tempdir) = create_partition(true).await;
         let messages = create_messages();
+        let messages_size = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_u32())
+            .sum();
+        let batch = IggyMessagesBatchMut::from_messages(&messages, 
messages_size);
         let messages_count = messages.len() as u32;
         let unique_messages_count = 3;
-        let appendable_batch_info = AppendableBatchInfo {
-            batch_size: messages
-                .iter()
-                .map(|m| m.get_size_bytes())
-                .sum::<IggyByteSize>(),
-            partition_id: partition.partition_id,
-        };
-        partition
-            .append_messages(appendable_batch_info, messages, None)
-            .await
-            .unwrap();
+
+        partition.append_messages(batch, None).await.unwrap();
 
         let loaded_messages = partition
             .get_messages_by_offset(0, messages_count)
             .await
             .unwrap();
-        assert_eq!(loaded_messages.len(), unique_messages_count);
+        assert_eq!(loaded_messages.count(), unique_messages_count);
     }
 
     async fn create_partition(deduplication_enabled: bool) -> (Partition, 
TempDir) {
@@ -441,5 +428,19 @@ mod tests {
             temp_dir,
         )
     }
-    */
+
+    fn create_messages() -> Vec<IggyMessage> {
+        vec![
+            create_message(1, "message 1"),
+            create_message(2, "message 2"),
+            create_message(3, "message 3"),
+            create_message(2, "message 3.2"),
+            create_message(1, "message 1.2"),
+            create_message(3, "message 3.3"),
+        ]
+    }
+
+    fn create_message(id: u128, payload: &str) -> IggyMessage {
+        IggyMessage::with_id(id, Bytes::from(payload.to_string()))
+    }
 }
diff --git a/server/src/streaming/partitions/storage.rs 
b/server/src/streaming/partitions/storage.rs
index b74fe6b7..80c89b43 100644
--- a/server/src/streaming/partitions/storage.rs
+++ b/server/src/streaming/partitions/storage.rs
@@ -99,9 +99,7 @@ impl PartitionStorage for FilePartitionStorage {
             let index_path_exists = 
tokio::fs::try_exists(&index_path).await.unwrap();
             let time_index_path_exists = 
tokio::fs::try_exists(&time_index_path).await.unwrap();
 
-            // Rebuild indexes in 2 cases:
-            // 1. Index cache is enabled and index at path does not exists.
-            // 2. Index cache is enabled and time index at path exists.
+            // Rebuild indexes if index cache is enabled and index at path 
does not exists.
             if index_cache_enabled && (!index_path_exists || 
time_index_path_exists) {
                 warn!(
                     "Index at path {} does not exist, rebuilding it based on 
{}...",
@@ -127,7 +125,6 @@ impl PartitionStorage for FilePartitionStorage {
                 );
             }
 
-            // Remove legacy time index if it exists.
             if time_index_path_exists {
                 tokio::fs::remove_file(&time_index_path).await.unwrap();
             }
diff --git a/server/src/streaming/segments/messages/messages_writer.rs 
b/server/src/streaming/segments/messages/messages_writer.rs
index 9d0cdd44..f95dd9ce 100644
--- a/server/src/streaming/segments/messages/messages_writer.rs
+++ b/server/src/streaming/segments/messages/messages_writer.rs
@@ -90,11 +90,6 @@ impl MessagesWriter {
             "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
             self.file_path
         );
-
-        for container in batch_set.iter() {
-            tracing::error!("Container size: {}", container.size());
-        }
-
         match confirmation {
             Confirmation::Wait => {
                 if let Some(ref mut file) = self.file {
diff --git a/server/src/streaming/segments/messages_accumulator.rs 
b/server/src/streaming/segments/messages_accumulator.rs
index 1d11a1a7..33e91503 100644
--- a/server/src/streaming/segments/messages_accumulator.rs
+++ b/server/src/streaming/segments/messages_accumulator.rs
@@ -172,8 +172,6 @@ impl MessagesAccumulator {
         for batch in self.batches.iter() {
             segment_indexes.concatenate(batch.indexes_slice());
         }
-
-        tracing::error!("hubcio after update indexes: {}", 
segment_indexes.count());
     }
 
     /// Consumes the accumulator and returns the contained message batches.
diff --git a/server/src/streaming/segments/reading_messages.rs 
b/server/src/streaming/segments/reading_messages.rs
index fdf4deaa..8cdc94c5 100644
--- a/server/src/streaming/segments/reading_messages.rs
+++ b/server/src/streaming/segments/reading_messages.rs
@@ -42,8 +42,6 @@ impl Segment {
         let accumulator_first_timestamp = self.accumulator.first_timestamp();
         let accumulator_last_timestamp = self.accumulator.last_timestamp();
 
-        tracing::error!("hubcio accumulator first timestamp: 
{accumulator_first_timestamp}, last timestamp: {accumulator_last_timestamp}");
-
         // Case 1: Requested timestamp is higher than any available timestamp
         if timestamp > accumulator_last_timestamp {
             return Ok(IggyMessagesBatchSet::empty());
@@ -123,19 +121,11 @@ impl Segment {
 
         // Case 1: All messages are in accumulator buffer
         if offset >= accumulator_first_msg_offset && end_offset <= 
accumulator_last_msg_offset {
-            tracing::error!(
-                "hubcio segment has {} messages, getting all from accumulator",
-                self.get_messages_count()
-            );
             return Ok(self.accumulator.get_messages_by_offset(offset, count));
         }
 
         // Case 2: All messages are on disk
         if end_offset < accumulator_first_msg_offset {
-            tracing::error!(
-                "hubcio segment has {} messages, getting all from disk",
-                self.get_messages_count()
-            );
             return self.load_messages_from_disk_by_offset(offset, count).await;
         }
 
@@ -169,12 +159,6 @@ impl Segment {
         // Calculate how many more messages we need from the accumulator
         let remaining_count = count - combined_batch_set.count();
 
-        tracing::error!(
-            "hubcio segment has {} messages, remaining count: {}",
-            self.get_messages_count(),
-            remaining_count
-        );
-
         if remaining_count > 0 {
             let accumulator_start_offset = std::cmp::max(offset, 
accumulator_first_msg_offset);
 
diff --git a/server/src/streaming/segments/segment.rs 
b/server/src/streaming/segments/segment.rs
index de1b85c7..246b5fe1 100644
--- a/server/src/streaming/segments/segment.rs
+++ b/server/src/streaming/segments/segment.rs
@@ -211,13 +211,16 @@ impl Segment {
     }
 
     pub async fn initialize_reading(&mut self) -> Result<(), IggyError> {
-        let log_reader =
+        if !self.config.segment.cache_indexes {
+            let index_reader =
+                IndexReader::new(&self.index_path, 
self.indexes_size.clone()).await?;
+            self.index_reader = Some(index_reader);
+        }
+
+        let messages_reader =
             MessagesReader::new(&self.messages_path, 
self.messages_size.clone()).await?;
-        // TODO(hubcio): there is no need to store open fd for reader if we 
have index cache enabled
-        let index_reader = IndexReader::new(&self.index_path, 
self.indexes_size.clone()).await?;
+        self.messages_reader = Some(messages_reader);
 
-        self.messages_reader = Some(log_reader);
-        self.index_reader = Some(index_reader);
         Ok(())
     }
 
diff --git a/server/src/streaming/segments/types/message_view_mut.rs 
b/server/src/streaming/segments/types/message_view_mut.rs
index ca283dec..b3521cf6 100644
--- a/server/src/streaming/segments/types/message_view_mut.rs
+++ b/server/src/streaming/segments/types/message_view_mut.rs
@@ -49,9 +49,9 @@ impl<'a> IggyMessageViewMut<'a> {
 
     /// Convenience to update the checksum field in the header
     pub fn update_checksum(&mut self) {
-        let start = 8; // Skip checksum field for checksum calculation
-        let size = self.size() - 8;
-        let data = &self.buffer[start..start + size];
+        let checksum_field_size = size_of::<u64>(); // Skip checksum field for 
checksum calculation
+        let size = self.size() - checksum_field_size;
+        let data = &self.buffer[checksum_field_size..checksum_field_size + 
size];
 
         let checksum = gxhash64(data, 0);
 
diff --git a/server/src/streaming/segments/types/messages_batch_set.rs 
b/server/src/streaming/segments/types/messages_batch_set.rs
index 34b9840c..af52db56 100644
--- a/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/server/src/streaming/segments/types/messages_batch_set.rs
@@ -132,12 +132,6 @@ impl IggyMessagesBatchSet {
         let mut remaining_count = count;
 
         for container in self.iter() {
-            tracing::error!(
-                "BATCH_SET container has {} messages, first offset {}, last 
offset {}",
-                container.count(),
-                container.first_offset(),
-                container.last_offset()
-            );
             if remaining_count == 0 {
                 break;
             }
@@ -149,13 +143,8 @@ impl IggyMessagesBatchSet {
 
             if let Some(sliced) = container.slice_by_offset(start_offset, 
remaining_count) {
                 if sliced.count() > 0 {
-                    tracing::error!(
-                        "BATCH_SET will get {} messages from container",
-                        sliced.count()
-                    );
                     remaining_count -= sliced.count();
                     result.add_batch(sliced);
-                    tracing::error!("BATCH_SET remaining count {}", 
remaining_count);
                 }
             }
         }
@@ -182,12 +171,6 @@ impl IggyMessagesBatchSet {
 
             let first_timestamp = container.first_timestamp();
             if first_timestamp < timestamp {
-                tracing::error!(
-                    "BATCH_SET container has {} messages, first timestamp {}, 
requested timestamp {}",
-                    container.count(),
-                    first_timestamp,
-                    timestamp
-                );
                 continue;
             }
 
diff --git a/server/src/streaming/segments/writing_messages.rs 
b/server/src/streaming/segments/writing_messages.rs
index 4d41b2ee..d33dabd3 100644
--- a/server/src/streaming/segments/writing_messages.rs
+++ b/server/src/streaming/segments/writing_messages.rs
@@ -1,4 +1,4 @@
-use super::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use super::IggyMessagesBatchMut;
 use crate::streaming::segments::segment::Segment;
 use error_set::ErrContext;
 use iggy::confirmation::Confirmation;
@@ -102,7 +102,6 @@ impl Segment {
         self.indexes.mark_saved();
 
         if !self.config.segment.cache_indexes {
-            tracing::error!("Clearing indexes cache");
             self.indexes.clear();
         }
 
diff --git a/server/src/streaming/topics/messages.rs 
b/server/src/streaming/topics/messages.rs
index 3048bad2..15b3f932 100644
--- a/server/src/streaming/topics/messages.rs
+++ b/server/src/streaming/topics/messages.rs
@@ -9,15 +9,12 @@ use error_set::ErrContext;
 use iggy::error::IggyError;
 use iggy::locking::IggySharedMutFn;
 use iggy::messages::{PartitioningKind, PollingKind};
-use iggy::prelude::{Partitioning, PolledMessages};
-use iggy::utils::byte_size::IggyByteSize;
+use iggy::prelude::Partitioning;
 use iggy::utils::expiry::IggyExpiry;
-use iggy::utils::sizeable::Sizeable;
 use iggy::utils::timestamp::IggyTimestamp;
 use iggy::{confirmation::Confirmation, prelude::PollingStrategy};
 use std::sync::atomic::Ordering;
-use std::sync::Arc;
-use tracing::{info, trace, warn};
+use tracing::trace;
 
 impl Topic {
     pub fn get_messages_count(&self) -> u64 {
diff --git a/server/src/tcp/connection_handler.rs 
b/server/src/tcp/connection_handler.rs
index 1dd74e10..2d99b9be 100644
--- a/server/src/tcp/connection_handler.rs
+++ b/server/src/tcp/connection_handler.rs
@@ -4,13 +4,7 @@ use crate::server_error::ConnectionError;
 use crate::streaming::session::Session;
 use crate::streaming::systems::system::SharedSystem;
 use crate::tcp::connection_handler::command::ServerCommand;
-use crate::tcp::COMPONENT;
-use bytes::{BufMut, BytesMut};
-use error_set::ErrContext;
-use iggy::bytes_serializable::BytesSerializable;
-use iggy::command::SEND_MESSAGES_CODE;
 use iggy::error::IggyError;
-use iggy::identifier::Identifier;
 use std::io::ErrorKind;
 use std::sync::Arc;
 use tracing::{debug, error, info};

Reply via email to