numinnex commented on code in PR #1679:
URL: https://github.com/apache/iggy/pull/1679#discussion_r2037793543


##########
sdk/src/models/messaging/messages_batch.rs:
##########
@@ -0,0 +1,587 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{IggyIndexes, IggyMessage, IggyMessageView, 
IggyMessageViewIterator};
+use crate::{
+    error::IggyError,
+    messages::MAX_PAYLOAD_SIZE,
+    models::messaging::INDEX_SIZE,
+    prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable},
+};
+use bytes::{BufMut, Bytes, BytesMut};
+use std::ops::{Deref, Index};
+
+/// An immutable messages container that holds a buffer of messages
+#[derive(Clone, Debug, PartialEq)]
+pub struct IggyMessagesBatch {
+    /// The number of messages in the batch
+    count: u32,
+    /// The byte-indexes of messages in the buffer, represented as array of 
u32's. Offsets are relative.
+    /// Each index consists of offset, position (byte offset in the buffer) 
and timestamp.
+    indexes: IggyIndexes,
+    /// The buffer containing the messages
+    messages: Bytes,
+}
+
+impl IggyMessagesBatch {
+    /// Create a batch from indexes buffer and messages buffer
+    pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self {
+        Self {
+            count,
+            indexes,
+            messages,
+        }
+    }
+
+    /// Creates a empty messages batch
+    pub fn empty() -> Self {
+        Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0)
+    }
+
+    /// Create iterator over messages
+    pub fn iter(&self) -> IggyMessageViewIterator {
+        IggyMessageViewIterator::new(&self.messages)
+    }
+
+    /// Get the number of messages
+    pub fn count(&self) -> u32 {
+        self.count
+    }
+
+    /// Check if the batch is empty
+    pub fn is_empty(&self) -> bool {
+        self.count() == 0
+    }
+
+    /// Get the total size of all messages in bytes
+    pub fn size(&self) -> u32 {
+        self.messages.len() as u32
+    }
+
+    /// Get access to the underlying buffer
+    pub fn buffer(&self) -> &[u8] {
+        &self.messages
+    }
+
+    /// Get the indexes slice
+    pub fn indexes_slice(&self) -> &[u8] {
+        &self.indexes
+    }
+
+    /// Take the indexes from the batch
+    pub fn take_indexes(&mut self) -> IggyIndexes {
+        std::mem::take(&mut self.indexes)
+    }
+
+    /// Decompose the batch into its components
+    pub fn decompose(self) -> (u32, IggyIndexes, Bytes) {
+        (self.count, self.indexes, self.messages)
+    }
+
+    /// Get index of first message
+    pub fn first_offset(&self) -> u64 {
+        self.iter()
+            .next()
+            .map(|msg| msg.header().offset())
+            .unwrap_or(0)
+    }
+
+    /// Get timestamp of first message
+    pub fn first_timestamp(&self) -> u64 {
+        self.iter()
+            .next()
+            .map(|msg| msg.header().timestamp())
+            .unwrap_or(0)
+    }
+
+    /// Get offset of last message
+    pub fn last_offset(&self) -> u64 {
+        self.iter()
+            .last()
+            .map(|msg| msg.header().offset())
+            .unwrap_or(0)
+    }
+
+    /// Get timestamp of last message
+    pub fn last_timestamp(&self) -> u64 {
+        self.iter()
+            .last()
+            .map(|msg| msg.header().timestamp())
+            .unwrap_or(0)
+    }
+
+    /// Calculates the start position of a message at the given index in the 
buffer
+    fn message_start_position(&self, index: usize) -> usize {
+        if index == 0 {
+            0
+        } else {
+            self.position_at(index as u32 - 1) as usize - 
self.indexes.base_position() as usize
+        }
+    }
+
+    /// Calculates the end position of a message at the given index in the 
buffer
+    fn message_end_position(&self, index: usize) -> usize {
+        if index >= self.count as usize - 1 {
+            self.messages.len()
+        } else {
+            self.position_at(index as u32) as usize - 
self.indexes.base_position() as usize
+        }
+    }
+
+    /// Gets the byte range for a message at the given index
+    fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
+        if index >= self.count as usize {
+            return None;
+        }
+
+        let start = self.message_start_position(index);
+        let end = self.message_end_position(index);
+
+        if start > self.messages.len() || end > self.messages.len() || start > 
end {
+            return None;
+        }
+
+        Some((start, end))
+    }
+
+    /// Helper method to read a position (u32) from the byte array at the 
given index
+    fn position_at(&self, position_index: u32) -> u32 {
+        if let Some(index) = self.indexes.get(position_index) {
+            index.position()
+        } else {
+            0
+        }
+    }
+
+    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
+    /// whose message headers have an offset greater than or equal to the 
provided `start_offset`.
+    pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> 
Option<Self> {
+        if self.is_empty() || count == 0 {
+            return None;
+        }
+
+        let first_offset = self.first_offset();
+
+        if start_offset < first_offset {
+            return self.slice_by_index(0, count);
+        }
+
+        let last_offset = self.last_offset();
+        if start_offset > last_offset {
+            return None;
+        }
+
+        let offset_diff = start_offset - first_offset;
+        let first_message_index = offset_diff as usize;
+
+        if first_message_index >= self.count() as usize {
+            return None;
+        }
+
+        self.slice_by_index(first_message_index as u32, count)
+    }
+
+    /// Helper method to slice the batch starting from a specific index
+    fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> {
+        if start_index >= self.count() {
+            return None;
+        }
+
+        let last_message_index =
+            std::cmp::min((start_index + count) as usize, self.count() as 
usize);
+
+        let sub_indexes = self.indexes.slice_by_offset(
+            start_index,
+            (last_message_index - start_index as usize) as u32,
+        )?;
+
+        let first_message_position = self.message_start_position(start_index 
as usize);
+        let last_message_position = 
self.message_end_position(last_message_index - 1);
+
+        let sub_buffer = self
+            .messages
+            .slice(first_message_position..last_message_position);
+
+        Some(IggyMessagesBatch {
+            count: (last_message_index - start_index as usize) as u32,
+            indexes: sub_indexes,
+            messages: sub_buffer,
+        })
+    }
+
+    /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
+    /// whose message headers have a timestamp greater than or equal to the 
provided `timestamp`.
+    ///
+    /// If no messages meet the criteria, returns `None`.
+    pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> 
Option<Self> {
+        if self.is_empty() || count == 0 {
+            return None;
+        }
+
+        // Use binary search to find the first message with timestamp >= the 
target
+        let first_message_index = self.binary_search_timestamp(timestamp)?;
+
+        self.slice_by_index(first_message_index, count)
+    }
+
+    /// Find the position of the index with timestamp closest to (but not 
exceeding) the target
+    fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> {

Review Comment:
   Maybe we can get away with just linear search for this ? Check this 
[talk](https://www.youtube.com/watch?v=sX2nF1fW7kI ) for some really great 
insight about the pitfalls of binary search.



##########
server/src/streaming/segments/messages_accumulator.rs:
##########
@@ -0,0 +1,199 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::streaming::deduplication::message_deduplicator::MessageDeduplicator;
+
+use super::types::{IggyMessagesBatchMut, IggyMessagesBatchSet};
+use tracing::trace;
+
+/// A container that accumulates messages in memory before they are written to 
disk.
+///
+/// The accumulator serves as a staging area for messages, allowing them to be
+/// collected and prepared for persistence. It maintains metadata like offsets,
+/// timestamps, and positions to ensure correct ordering and indexing.
+#[derive(Debug, Default)]
+pub struct MessagesAccumulator {
+    /// Base offset of the first message in the accumulator
+    base_offset: u64,
+
+    /// Current (latest) offset in the accumulator
+    current_offset: u64,
+
+    /// Current (latest) byte position for the next message in the segment, 
also size of all messages in the accumulator
+    current_position: u32,
+
+    /// Collection of all message batches in the accumulator
+    batches: IggyMessagesBatchSet,
+
+    /// Total number of messages in the accumulator
+    messages_count: u32,
+}
+
+impl MessagesAccumulator {
+    /// Adds a batch of messages to the accumulator and prepares them for 
persistence.
+    ///
+    /// This method assigns offsets, timestamps, and positions to the messages
+    /// and updates the indexes accordingly. It ensures message continuity by

Review Comment:
   `It ensures message continuity by managing offsets to prevent gaps or 
overlaps` I think this comment doesn't make sense, what I think we should put 
there is documentation around how we update those indices.
   



##########
sdk/src/models/messaging/messages_batch.rs:
##########
@@ -0,0 +1,587 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use super::{IggyIndexes, IggyMessage, IggyMessageView, 
IggyMessageViewIterator};
+use crate::{
+    error::IggyError,
+    messages::MAX_PAYLOAD_SIZE,
+    models::messaging::INDEX_SIZE,
+    prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable},
+};
+use bytes::{BufMut, Bytes, BytesMut};
+use std::ops::{Deref, Index};
+
+/// An immutable messages container that holds a buffer of messages
+#[derive(Clone, Debug, PartialEq)]
+pub struct IggyMessagesBatch {
+    /// The number of messages in the batch
+    count: u32,
+    /// The byte-indexes of messages in the buffer, represented as array of 
u32's. Offsets are relative.
+    /// Each index consists of offset, position (byte offset in the buffer) 
and timestamp.
+    indexes: IggyIndexes,
+    /// The buffer containing the messages
+    messages: Bytes,
+}
+
+impl IggyMessagesBatch {
+    /// Create a batch from indexes buffer and messages buffer
+    pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self {
+        Self {
+            count,
+            indexes,
+            messages,
+        }
+    }
+
+    /// Creates a empty messages batch
+    pub fn empty() -> Self {
+        Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0)
+    }
+
+    /// Create iterator over messages
+    pub fn iter(&self) -> IggyMessageViewIterator {
+        IggyMessageViewIterator::new(&self.messages)
+    }
+
+    /// Get the number of messages
+    pub fn count(&self) -> u32 {
+        self.count
+    }
+
+    /// Check if the batch is empty
+    pub fn is_empty(&self) -> bool {
+        self.count() == 0
+    }
+
+    /// Get the total size of all messages in bytes
+    pub fn size(&self) -> u32 {
+        self.messages.len() as u32
+    }
+
+    /// Get access to the underlying buffer
+    pub fn buffer(&self) -> &[u8] {
+        &self.messages
+    }
+
+    /// Get the indexes slice
+    pub fn indexes_slice(&self) -> &[u8] {
+        &self.indexes
+    }
+
+    /// Take the indexes from the batch
+    pub fn take_indexes(&mut self) -> IggyIndexes {
+        std::mem::take(&mut self.indexes)
+    }
+
+    /// Decompose the batch into its components
+    pub fn decompose(self) -> (u32, IggyIndexes, Bytes) {
+        (self.count, self.indexes, self.messages)
+    }
+
+    /// Get index of first message
+    pub fn first_offset(&self) -> u64 {

Review Comment:
   I think we should either return Result/Option or panic for all the 
first/last getter functions



##########
server/src/streaming/segments/indexes/indexes_mut.rs:
##########
@@ -0,0 +1,365 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy::models::messaging::{IggyIndexView, IggyIndexes, INDEX_SIZE};
+use std::fmt;
+use std::ops::{Deref, Index as StdIndex};
+
+/// A container for binary-encoded index data.
+/// Optimized for efficient storage and I/O operations.
+#[derive(Default, Clone)]
+pub struct IggyIndexesMut {
+    buffer: BytesMut,
+    saved_count: u32,
+    base_position: u32,
+}
+
+impl IggyIndexesMut {
+    /// Creates a new empty container
+    pub fn empty() -> Self {
+        Self {
+            buffer: BytesMut::new(),
+            saved_count: 0,
+            base_position: 0,
+        }
+    }
+
+    /// Creates indexes from bytes
+    pub fn from_bytes(indexes: BytesMut) -> Self {
+        Self {
+            buffer: indexes,
+            saved_count: 0,
+            base_position: 0,
+        }
+    }
+
+    /// Gets the size of all indexes messages
+    pub fn messages_size(&self) -> u32 {
+        self.last_position()
+    }
+
+    /// Gets the base position of the indexes
+    pub fn base_position(&self) -> u32 {
+        self.base_position
+    }
+
+    /// Sets the base position of the indexes
+    pub fn set_base_position(&mut self, base_position: u32) {
+        self.base_position = base_position;
+    }
+
+    /// Helper method to get the last index position
+    pub fn last_position(&self) -> u32 {
+        self.get(self.count() - 1)
+            .map(|idx| idx.position())
+            .unwrap_or(0)
+    }
+
+    /// Creates a new container with the specified capacity
+    pub fn with_capacity(capacity: usize, base_position: u32) -> Self {
+        Self {
+            buffer: BytesMut::with_capacity(capacity * INDEX_SIZE),
+            saved_count: 0,
+            base_position,
+        }
+    }
+
+    /// Makes the indexes immutable
+    pub fn make_immutable(self) -> IggyIndexes {
+        IggyIndexes::new(self.buffer.freeze(), self.base_position)
+    }
+
+    /// Inserts a new index at the end of buffer
+    pub fn insert(&mut self, offset: u32, position: u32, timestamp: u64) {
+        self.buffer.put_u32_le(offset);
+        self.buffer.put_u32_le(position);
+        self.buffer.put_u64_le(timestamp);
+    }
+
+    /// Appends another slice of indexes to this one.
+    pub fn concatenate(&mut self, other: Bytes) {
+        self.buffer.put(other);
+    }
+
+    /// Gets the number of indexes in the container
+    pub fn count(&self) -> u32 {
+        self.buffer.len() as u32 / INDEX_SIZE as u32
+    }
+
+    /// Checks if the container is empty
+    pub fn is_empty(&self) -> bool {
+        self.count() == 0
+    }
+
+    /// Gets the size of the buffer in bytes
+    pub fn size(&self) -> u32 {
+        self.buffer.len() as u32
+    }
+
+    /// Gets a view of the Index at the specified index
+    pub fn get(&self, index: u32) -> Option<IggyIndexView> {
+        if index >= self.count() {
+            return None;
+        }
+
+        let start = index as usize * INDEX_SIZE;
+        let end = start + INDEX_SIZE;
+
+        if end <= self.buffer.len() {
+            Some(IggyIndexView::new(&self.buffer[start..end]))
+        } else {
+            None
+        }
+    }
+
+    // Set the offset at the given index position
+    pub fn set_offset_at(&mut self, index: u32, offset: u32) {
+        let pos = index as usize * INDEX_SIZE;
+        self.buffer[pos..pos + 4].copy_from_slice(&offset.to_le_bytes());

Review Comment:
   Copy from slice already performs bounds checking, we don't have to do that. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to