This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 79caf940e perf: make IggyMessagesBatch::last_offset O(1) (#2840)
79caf940e is described below

commit 79caf940e63bc6815dcaf32a281956d801dff09a
Author: Cistus Creticus <[email protected]>
AuthorDate: Thu Mar 5 02:47:12 2026 +0800

    perf: make IggyMessagesBatch::last_offset O(1) (#2840)
    
    Avoid unnecessary O(n) lookup in IggyMessagesBatch::last_offset() to 
improve throughput.
---
 Cargo.toml                                         | 39 ++--------
 .../common/src/types/message/message_boundaries.rs | 83 ++++++++++++++++++++
 core/common/src/types/message/message_view.rs      | 88 +++++++++++++++++++++-
 core/common/src/types/message/messages_batch.rs    | 53 ++++---------
 .../common/src/types/message/messages_batch_mut.rs | 42 +++++------
 core/common/src/types/message/mod.rs               |  1 +
 6 files changed, 211 insertions(+), 95 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 3be3da952..31cb08ab6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -74,14 +74,7 @@ async-broadcast = "0.7.2"
 async-channel = "2.5.0"
 async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
 async-trait = "0.1.89"
-async_zip = { version = "0.0.18", features = [
-    "tokio",
-    "lzma",
-    "bzip2",
-    "xz",
-    "deflate",
-    "zstd",
-] }
+async_zip = { version = "0.0.18", features = ["tokio", "lzma", "bzip2", "xz", 
"deflate", "zstd"] }
 axum = { version = "0.8.8", features = ["macros"] }
 axum-server = { version = "0.8.0", features = ["tls-rustls"] }
 base64 = "0.22.1"
@@ -93,11 +86,7 @@ bench-runner = { path = "core/bench/runner" }
 bit-set = "0.8.0"
 blake3 = "1.8.3"
 bon = "3.8.2"
-byte-unit = { version = "5.2.0", default-features = false, features = [
-    "serde",
-    "byte",
-    "std",
-] }
+byte-unit = { version = "5.2.0", default-features = false, features = 
["serde", "byte", "std"] }
 bytemuck = { version = "1.25" }
 bytes = "1.11.1"
 charming = "0.6.0"
@@ -233,10 +222,7 @@ rand_xoshiro = "0.8.0"
 rayon = "1.11.0"
 rcgen = "0.14.7"
 regex = "1.12.3"
-reqwest = { version = "0.12.28", default-features = false, features = [
-    "json",
-    "rustls-tls",
-] }
+reqwest = { version = "0.12.28", default-features = false, features = ["json", 
"rustls-tls"] }
 reqwest-middleware = { version = "0.4.2", features = ["json"] }
 reqwest-retry = "0.8.0"
 reqwest-tracing = "0.5.8"
@@ -246,10 +232,7 @@ rmcp = "0.15.0"
 rmp-serde = "1.3.1"
 rolling-file = "0.2.0"
 rust-embed = "8.11.0"
-rust-s3 = { version = "0.37.1", default-features = false, features = [
-    "tokio-rustls-tls",
-    "tags",
-] }
+rust-s3 = { version = "0.37.1", default-features = false, features = 
["tokio-rustls-tls", "tags"] }
 rustls = { version = "0.23.36", features = ["ring"] }
 rustls-pemfile = "2.2.0"
 send_wrapper = "0.6.0"
@@ -283,11 +266,7 @@ tokio-rustls = "0.26.4"
 tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"] 
}
 tokio-util = { version = "0.7.18", features = ["compat"] }
 toml = "1.0.0"
-tower-http = { version = "0.6.8", features = [
-    "add-extension",
-    "cors",
-    "trace",
-] }
+tower-http = { version = "0.6.8", features = ["add-extension", "cors", 
"trace"] }
 tracing = "0.1.44"
 tracing-appender = "0.2.4"
 tracing-opentelemetry = "0.32.1"
@@ -300,13 +279,7 @@ trait-variant = "0.1.2"
 tungstenite = "0.28.0"
 twox-hash = { version = "2.1.2", features = ["xxhash32"] }
 ulid = "1.2.1"
-uuid = { version = "1.20.0", features = [
-    "v4",
-    "v7",
-    "fast-rng",
-    "serde",
-    "zerocopy",
-] }
+uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde", 
"zerocopy"] }
 vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", 
"si"] }
 walkdir = "2.5.0"
 wasm-bindgen = "0.2"
diff --git a/core/common/src/types/message/message_boundaries.rs 
b/core/common/src/types/message/message_boundaries.rs
new file mode 100644
index 000000000..f1928e6c6
--- /dev/null
+++ b/core/common/src/types/message/message_boundaries.rs
@@ -0,0 +1,83 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#[derive(Clone, Copy)]
+pub(crate) struct IggyMessageBoundaries<'a> {
+    indexes: &'a [u8],
+    messages_len: usize,
+    base_position: u32,
+    count: usize,
+}
+
+impl<'a> IggyMessageBoundaries<'a> {
+    pub(crate) fn new(
+        indexes: &'a [u8],
+        messages_len: usize,
+        base_position: u32,
+        count: u32,
+    ) -> Option<Self> {
+        let count = count as usize;
+        let required_bytes = count.checked_mul(super::INDEX_SIZE)?;
+        if indexes.len() < required_bytes {
+            return None;
+        }
+
+        Some(Self {
+            indexes,
+            messages_len,
+            base_position,
+            count,
+        })
+    }
+
+    pub(crate) fn count(&self) -> usize {
+        self.count
+    }
+
+    pub(crate) fn boundaries(&self, index: usize) -> Option<(usize, usize)> {
+        if index >= self.count {
+            return None;
+        }
+
+        let start = if index == 0 {
+            0
+        } else {
+            self.relative_position(index - 1)?
+        };
+
+        let end = if index + 1 == self.count {
+            self.messages_len
+        } else {
+            self.relative_position(index)?
+        };
+
+        if start > self.messages_len || end > self.messages_len || start > end 
{
+            return None;
+        }
+
+        Some((start, end))
+    }
+
+    fn relative_position(&self, index: usize) -> Option<usize> {
+        let start = index.checked_mul(super::INDEX_SIZE)?.checked_add(4)?;
+        let end = start.checked_add(4)?;
+        let position_bytes = self.indexes.get(start..end)?;
+        let absolute_position = 
u32::from_le_bytes(position_bytes.try_into().ok()?);
+        Some(absolute_position.checked_sub(self.base_position)? as usize)
+    }
+}
diff --git a/core/common/src/types/message/message_view.rs 
b/core/common/src/types/message/message_view.rs
index dd3fb6b1a..d6d42e0b4 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -17,6 +17,7 @@
  */
 
 use super::HeaderValue;
+use super::message_boundaries::IggyMessageBoundaries;
 use super::message_header::*;
 use crate::BytesSerializable;
 use crate::IggyByteSize;
@@ -25,7 +26,8 @@ use crate::error::IggyError;
 use crate::utils::checksum;
 use crate::{HeaderKey, IggyMessageHeaderView};
 use bytes::{Bytes, BytesMut};
-use std::{collections::HashMap, iter::Iterator};
+use std::collections::HashMap;
+use std::num::NonZeroUsize;
 
 /// A immutable view of a message.
 #[derive(Debug)]
@@ -160,6 +162,7 @@ impl Sizeable for IggyMessageView<'_> {
 pub struct IggyMessageViewIterator<'a> {
     buffer: &'a [u8],
     position: usize,
+    indexed_last: Option<(usize, NonZeroUsize)>,
 }
 
 impl<'a> IggyMessageViewIterator<'a> {
@@ -167,8 +170,27 @@ impl<'a> IggyMessageViewIterator<'a> {
         Self {
             buffer,
             position: 0,
+            indexed_last: None,
         }
     }
+
+    pub(crate) fn new_with_boundaries(
+        messages: &'a [u8],
+        indexes: &'a [u8],
+        base_position: u32,
+        count: u32,
+    ) -> Self {
+        let mut iter = Self::new(messages);
+        if let Some(boundaries) =
+            IggyMessageBoundaries::new(indexes, messages.len(), base_position, 
count)
+            && boundaries.count() > 0
+        {
+            iter.indexed_last = boundaries
+                .boundaries(boundaries.count() - 1)
+                .and_then(|(start, end)| Some((start, 
NonZeroUsize::new(end)?)));
+        }
+        iter
+    }
 }
 
 impl<'a> Iterator for IggyMessageViewIterator<'a> {
@@ -184,4 +206,68 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
         self.position += view.size();
         Some(view)
     }
+
+    fn last(self) -> Option<Self::Item> {
+        if self.position == 0
+            && let Some((start, end)) = self.indexed_last
+            && let Ok(view) = 
IggyMessageView::new(&self.buffer[start..end.get()])
+        {
+            return Some(view);
+        }
+
+        let mut last = None;
+        for item in self {
+            last = Some(item);
+        }
+        last
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::IggyMessage;
+    use bytes::Bytes;
+
+    fn build_batch() -> crate::IggyMessagesBatch {
+        let messages = vec![
+            IggyMessage::builder()
+                .payload(Bytes::from_static(b"one"))
+                .build()
+                .unwrap(),
+            IggyMessage::builder()
+                .payload(Bytes::from_static(b"two"))
+                .build()
+                .unwrap(),
+            IggyMessage::builder()
+                .payload(Bytes::from_static(b"three"))
+                .build()
+                .unwrap(),
+        ];
+        crate::IggyMessagesBatch::from(messages)
+    }
+
+    #[test]
+    fn should_return_tail_for_indexed_last_after_next() {
+        let batch = build_batch();
+        let mut iter = IggyMessageViewIterator::new_with_boundaries(
+            batch.buffer(),
+            batch.indexes_slice(),
+            batch.indexes().base_position(),
+            batch.count(),
+        );
+
+        let first = iter.next().unwrap();
+        assert_eq!(first.payload(), b"one");
+
+        let last = iter.last().unwrap();
+        assert_eq!(last.payload(), b"three");
+    }
+
+    #[test]
+    fn should_return_last_message_for_raw_last() {
+        let batch = build_batch();
+        let last = 
IggyMessageViewIterator::new(batch.buffer()).last().unwrap();
+        assert_eq!(last.payload(), b"three");
+    }
 }
diff --git a/core/common/src/types/message/messages_batch.rs 
b/core/common/src/types/message/messages_batch.rs
index aa6910b01..d9b7b171f 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -16,6 +16,7 @@
  * under the License.
  */
 
+use super::message_boundaries::IggyMessageBoundaries;
 use crate::{
     BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, 
IggyMessageView,
     IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable, 
error::IggyError,
@@ -52,7 +53,12 @@ impl IggyMessagesBatch {
 
     /// Create iterator over messages
     pub fn iter(&self) -> IggyMessageViewIterator<'_> {
-        IggyMessageViewIterator::new(&self.messages)
+        IggyMessageViewIterator::new_with_boundaries(
+            &self.messages,
+            &self.indexes,
+            self.indexes.base_position(),
+            self.count,
+        )
     }
 
     /// Get the number of messages
@@ -120,47 +126,18 @@ impl IggyMessagesBatch {
         self.iter().last().map(|msg| msg.header().timestamp())
     }
 
-    /// Calculates the start position of a message at the given index in the 
buffer
-    fn message_start_position(&self, index: usize) -> usize {
-        if index == 0 {
-            0
-        } else {
-            self.position_at(index as u32 - 1) as usize - 
self.indexes.base_position() as usize
-        }
-    }
-
-    /// Calculates the end position of a message at the given index in the 
buffer
-    fn message_end_position(&self, index: usize) -> usize {
-        if index >= self.count as usize - 1 {
-            self.messages.len()
-        } else {
-            self.position_at(index as u32) as usize - 
self.indexes.base_position() as usize
-        }
+    fn boundaries(&self) -> Option<IggyMessageBoundaries<'_>> {
+        IggyMessageBoundaries::new(
+            &self.indexes,
+            self.messages.len(),
+            self.indexes.base_position(),
+            self.count,
+        )
     }
 
     /// Gets the byte range for a message at the given index
     fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
-        if index >= self.count as usize {
-            return None;
-        }
-
-        let start = self.message_start_position(index);
-        let end = self.message_end_position(index);
-
-        if start > self.messages.len() || end > self.messages.len() || start > 
end {
-            return None;
-        }
-
-        Some((start, end))
-    }
-
-    /// Helper method to read a position (u32) from the byte array at the 
given index
-    fn position_at(&self, position_index: u32) -> u32 {
-        if let Some(index) = self.indexes.get(position_index) {
-            index.position()
-        } else {
-            0
-        }
+        self.boundaries()?.boundaries(index)
     }
 
     /// Get the message at the specified index.
diff --git a/core/common/src/types/message/messages_batch_mut.rs 
b/core/common/src/types/message/messages_batch_mut.rs
index 55e33bf27..231ba00a2 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -17,6 +17,7 @@
  */
 
 use super::indexes_mut::IggyIndexesMut;
+use super::message_boundaries::IggyMessageBoundaries;
 use super::message_view_mut::IggyMessageViewMutIterator;
 use crate::{
     BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, 
IggyError,
@@ -100,7 +101,12 @@ impl IggyMessagesBatchMut {
 
     /// Creates an iterator that yields immutable views of messages.
     pub fn iter(&self) -> IggyMessageViewIterator<'_> {
-        IggyMessageViewIterator::new(&self.messages)
+        IggyMessageViewIterator::new_with_boundaries(
+            &self.messages,
+            &self.indexes,
+            self.indexes.base_position(),
+            self.count(),
+        )
     }
 
     /// Returns the number of messages in the batch.
@@ -289,32 +295,23 @@ impl IggyMessagesBatchMut {
         self.indexes.get(index).map(|index| index.position())
     }
 
+    fn boundaries(&self) -> Option<IggyMessageBoundaries<'_>> {
+        IggyMessageBoundaries::new(
+            &self.indexes,
+            self.messages.len(),
+            self.indexes.base_position(),
+            self.count(),
+        )
+    }
+
     /// Calculates the start position of a message at the given index in the 
buffer
     fn message_start_position(&self, index: usize) -> Option<usize> {
-        if index >= self.count() as usize {
-            return None;
-        }
-
-        if index == 0 {
-            Some(0)
-        } else {
-            self.position_at(index as u32 - 1)
-                .map(|pos| (pos - self.indexes.base_position()) as usize)
-        }
+        self.get_message_boundaries(index).map(|(start, _)| start)
     }
 
     /// Calculates the end position of a message at the given index in the 
buffer
     fn message_end_position(&self, index: usize) -> Option<usize> {
-        if index >= self.count() as usize {
-            return None;
-        }
-
-        if index == self.count() as usize - 1 {
-            Some(self.messages.len())
-        } else {
-            self.position_at(index as u32)
-                .map(|pos| (pos - self.indexes.base_position()) as usize)
-        }
+        self.get_message_boundaries(index).map(|(_, end)| end)
     }
 
     /// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to 
`count` messages
@@ -464,8 +461,7 @@ impl IggyMessagesBatchMut {
 
     /// Gets the byte range for a message at the given index
     fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
-        let start = self.message_start_position(index)?;
-        let end = self.message_end_position(index)?;
+        let (start, end) = self.boundaries()?.boundaries(index)?;
 
         if start > self.messages.len()
             || end > self.messages.len()
diff --git a/core/common/src/types/message/mod.rs 
b/core/common/src/types/message/mod.rs
index d3f85fb63..a20d69fc4 100644
--- a/core/common/src/types/message/mod.rs
+++ b/core/common/src/types/message/mod.rs
@@ -22,6 +22,7 @@ mod index;
 mod index_view;
 mod indexes;
 mod indexes_mut;
+mod message_boundaries;
 mod message_header;
 mod message_header_view;
 mod message_header_view_mut;

Reply via email to