hubcio commented on code in PR #1679: URL: https://github.com/apache/iggy/pull/1679#discussion_r2039742503
########## sdk/src/models/messaging/messages_batch.rs: ########## @@ -0,0 +1,587 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use super::{IggyIndexes, IggyMessage, IggyMessageView, IggyMessageViewIterator}; +use crate::{ + error::IggyError, + messages::MAX_PAYLOAD_SIZE, + models::messaging::INDEX_SIZE, + prelude::{BytesSerializable, IggyByteSize, Sizeable, Validatable}, +}; +use bytes::{BufMut, Bytes, BytesMut}; +use std::ops::{Deref, Index}; + +/// An immutable messages container that holds a buffer of messages +#[derive(Clone, Debug, PartialEq)] +pub struct IggyMessagesBatch { + /// The number of messages in the batch + count: u32, + /// The byte-indexes of messages in the buffer, represented as array of u32's. Offsets are relative. + /// Each index consists of offset, position (byte offset in the buffer) and timestamp. + indexes: IggyIndexes, + /// The buffer containing the messages + messages: Bytes, +} + +impl IggyMessagesBatch { + /// Create a batch from indexes buffer and messages buffer + pub fn new(indexes: IggyIndexes, messages: Bytes, count: u32) -> Self { + Self { + count, + indexes, + messages, + } + } + + /// Creates a empty messages batch + pub fn empty() -> Self { + Self::new(IggyIndexes::empty(), BytesMut::new().freeze(), 0) + } + + /// Create iterator over messages + pub fn iter(&self) -> IggyMessageViewIterator { + IggyMessageViewIterator::new(&self.messages) + } + + /// Get the number of messages + pub fn count(&self) -> u32 { + self.count + } + + /// Check if the batch is empty + pub fn is_empty(&self) -> bool { + self.count() == 0 + } + + /// Get the total size of all messages in bytes + pub fn size(&self) -> u32 { + self.messages.len() as u32 + } + + /// Get access to the underlying buffer + pub fn buffer(&self) -> &[u8] { + &self.messages + } + + /// Get the indexes slice + pub fn indexes_slice(&self) -> &[u8] { + &self.indexes + } + + /// Take the indexes from the batch + pub fn take_indexes(&mut self) -> IggyIndexes { + std::mem::take(&mut self.indexes) + } + + /// Decompose the batch into its components + pub fn decompose(self) -> (u32, IggyIndexes, Bytes) { + (self.count, self.indexes, self.messages) + } + + /// Get index of first message + pub fn first_offset(&self) -> u64 { + self.iter() + .next() + .map(|msg| msg.header().offset()) + .unwrap_or(0) + } + + /// Get timestamp of first message + pub fn first_timestamp(&self) -> u64 { + self.iter() + .next() + .map(|msg| msg.header().timestamp()) + .unwrap_or(0) + } + + /// Get offset of last message + pub fn last_offset(&self) -> u64 { + self.iter() + .last() + .map(|msg| msg.header().offset()) + .unwrap_or(0) + } + + /// Get timestamp of last message + pub fn last_timestamp(&self) -> u64 { + self.iter() + .last() + .map(|msg| msg.header().timestamp()) + .unwrap_or(0) + } + + /// Calculates the start position of a message at the given index in the buffer + fn message_start_position(&self, index: usize) -> usize { + if index == 0 { + 0 + } else { + self.position_at(index as u32 - 1) as usize - self.indexes.base_position() as usize + } + } + + /// Calculates the end position of a message at the given index in the buffer + fn message_end_position(&self, index: usize) -> usize { + if index >= self.count as usize - 1 { + self.messages.len() + } else { + self.position_at(index as u32) as usize - self.indexes.base_position() as usize + } + } + + /// Gets the byte range for a message at the given index + fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> { + if index >= self.count as usize { + return None; + } + + let start = self.message_start_position(index); + let end = self.message_end_position(index); + + if start > self.messages.len() || end > self.messages.len() || start > end { + return None; + } + + Some((start, end)) + } + + /// Helper method to read a position (u32) from the byte array at the given index + fn position_at(&self, position_index: u32) -> u32 { + if let Some(index) = self.indexes.get(position_index) { + index.position() + } else { + 0 + } + } + + /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages + /// whose message headers have an offset greater than or equal to the provided `start_offset`. + pub fn slice_by_offset(&self, start_offset: u64, count: u32) -> Option<Self> { + if self.is_empty() || count == 0 { + return None; + } + + let first_offset = self.first_offset(); + + if start_offset < first_offset { + return self.slice_by_index(0, count); + } + + let last_offset = self.last_offset(); + if start_offset > last_offset { + return None; + } + + let offset_diff = start_offset - first_offset; + let first_message_index = offset_diff as usize; + + if first_message_index >= self.count() as usize { + return None; + } + + self.slice_by_index(first_message_index as u32, count) + } + + /// Helper method to slice the batch starting from a specific index + fn slice_by_index(&self, start_index: u32, count: u32) -> Option<Self> { + if start_index >= self.count() { + return None; + } + + let last_message_index = + std::cmp::min((start_index + count) as usize, self.count() as usize); + + let sub_indexes = self.indexes.slice_by_offset( + start_index, + (last_message_index - start_index as usize) as u32, + )?; + + let first_message_position = self.message_start_position(start_index as usize); + let last_message_position = self.message_end_position(last_message_index - 1); + + let sub_buffer = self + .messages + .slice(first_message_position..last_message_position); + + Some(IggyMessagesBatch { + count: (last_message_index - start_index as usize) as u32, + indexes: sub_indexes, + messages: sub_buffer, + }) + } + + /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to `count` messages + /// whose message headers have a timestamp greater than or equal to the provided `timestamp`. + /// + /// If no messages meet the criteria, returns `None`. + pub fn slice_by_timestamp(&self, timestamp: u64, count: u32) -> Option<Self> { + if self.is_empty() || count == 0 { + return None; + } + + // Use binary search to find the first message with timestamp >= the target + let first_message_index = self.binary_search_timestamp(timestamp)?; + + self.slice_by_index(first_message_index, count) + } + + /// Find the position of the index with timestamp closest to (but not exceeding) the target + fn binary_search_timestamp(&self, target_timestamp: u64) -> Option<u32> { Review Comment: I will leave it as is, but feel free to create PRs for improvements once this code is merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
