This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 79caf940e perf: make IggyMessagesBatch::last_offset O(1) (#2840)
79caf940e is described below
commit 79caf940e63bc6815dcaf32a281956d801dff09a
Author: Cistus Creticus <[email protected]>
AuthorDate: Thu Mar 5 02:47:12 2026 +0800
perf: make IggyMessagesBatch::last_offset O(1) (#2840)
Avoid unnecessary O(n) lookup in IggyMessagesBatch::last_offset() to
improve throughput.
---
Cargo.toml | 39 ++--------
.../common/src/types/message/message_boundaries.rs | 83 ++++++++++++++++++++
core/common/src/types/message/message_view.rs | 88 +++++++++++++++++++++-
core/common/src/types/message/messages_batch.rs | 53 ++++---------
.../common/src/types/message/messages_batch_mut.rs | 42 +++++------
core/common/src/types/message/mod.rs | 1 +
6 files changed, 211 insertions(+), 95 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 3be3da952..31cb08ab6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -74,14 +74,7 @@ async-broadcast = "0.7.2"
async-channel = "2.5.0"
async-dropper = { version = "0.3.1", features = ["tokio", "simple"] }
async-trait = "0.1.89"
-async_zip = { version = "0.0.18", features = [
- "tokio",
- "lzma",
- "bzip2",
- "xz",
- "deflate",
- "zstd",
-] }
+async_zip = { version = "0.0.18", features = ["tokio", "lzma", "bzip2", "xz",
"deflate", "zstd"] }
axum = { version = "0.8.8", features = ["macros"] }
axum-server = { version = "0.8.0", features = ["tls-rustls"] }
base64 = "0.22.1"
@@ -93,11 +86,7 @@ bench-runner = { path = "core/bench/runner" }
bit-set = "0.8.0"
blake3 = "1.8.3"
bon = "3.8.2"
-byte-unit = { version = "5.2.0", default-features = false, features = [
- "serde",
- "byte",
- "std",
-] }
+byte-unit = { version = "5.2.0", default-features = false, features =
["serde", "byte", "std"] }
bytemuck = { version = "1.25" }
bytes = "1.11.1"
charming = "0.6.0"
@@ -233,10 +222,7 @@ rand_xoshiro = "0.8.0"
rayon = "1.11.0"
rcgen = "0.14.7"
regex = "1.12.3"
-reqwest = { version = "0.12.28", default-features = false, features = [
- "json",
- "rustls-tls",
-] }
+reqwest = { version = "0.12.28", default-features = false, features = ["json",
"rustls-tls"] }
reqwest-middleware = { version = "0.4.2", features = ["json"] }
reqwest-retry = "0.8.0"
reqwest-tracing = "0.5.8"
@@ -246,10 +232,7 @@ rmcp = "0.15.0"
rmp-serde = "1.3.1"
rolling-file = "0.2.0"
rust-embed = "8.11.0"
-rust-s3 = { version = "0.37.1", default-features = false, features = [
- "tokio-rustls-tls",
- "tags",
-] }
+rust-s3 = { version = "0.37.1", default-features = false, features =
["tokio-rustls-tls", "tags"] }
rustls = { version = "0.23.36", features = ["ring"] }
rustls-pemfile = "2.2.0"
send_wrapper = "0.6.0"
@@ -283,11 +266,7 @@ tokio-rustls = "0.26.4"
tokio-tungstenite = { version = "0.28", features = ["rustls-tls-webpki-roots"]
}
tokio-util = { version = "0.7.18", features = ["compat"] }
toml = "1.0.0"
-tower-http = { version = "0.6.8", features = [
- "add-extension",
- "cors",
- "trace",
-] }
+tower-http = { version = "0.6.8", features = ["add-extension", "cors",
"trace"] }
tracing = "0.1.44"
tracing-appender = "0.2.4"
tracing-opentelemetry = "0.32.1"
@@ -300,13 +279,7 @@ trait-variant = "0.1.2"
tungstenite = "0.28.0"
twox-hash = { version = "2.1.2", features = ["xxhash32"] }
ulid = "1.2.1"
-uuid = { version = "1.20.0", features = [
- "v4",
- "v7",
- "fast-rng",
- "serde",
- "zerocopy",
-] }
+uuid = { version = "1.20.0", features = ["v4", "v7", "fast-rng", "serde",
"zerocopy"] }
vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc",
"si"] }
walkdir = "2.5.0"
wasm-bindgen = "0.2"
diff --git a/core/common/src/types/message/message_boundaries.rs
b/core/common/src/types/message/message_boundaries.rs
new file mode 100644
index 000000000..f1928e6c6
--- /dev/null
+++ b/core/common/src/types/message/message_boundaries.rs
@@ -0,0 +1,83 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#[derive(Clone, Copy)]
+pub(crate) struct IggyMessageBoundaries<'a> {
+ indexes: &'a [u8],
+ messages_len: usize,
+ base_position: u32,
+ count: usize,
+}
+
+impl<'a> IggyMessageBoundaries<'a> {
+ pub(crate) fn new(
+ indexes: &'a [u8],
+ messages_len: usize,
+ base_position: u32,
+ count: u32,
+ ) -> Option<Self> {
+ let count = count as usize;
+ let required_bytes = count.checked_mul(super::INDEX_SIZE)?;
+ if indexes.len() < required_bytes {
+ return None;
+ }
+
+ Some(Self {
+ indexes,
+ messages_len,
+ base_position,
+ count,
+ })
+ }
+
+ pub(crate) fn count(&self) -> usize {
+ self.count
+ }
+
+ pub(crate) fn boundaries(&self, index: usize) -> Option<(usize, usize)> {
+ if index >= self.count {
+ return None;
+ }
+
+ let start = if index == 0 {
+ 0
+ } else {
+ self.relative_position(index - 1)?
+ };
+
+ let end = if index + 1 == self.count {
+ self.messages_len
+ } else {
+ self.relative_position(index)?
+ };
+
+ if start > self.messages_len || end > self.messages_len || start > end
{
+ return None;
+ }
+
+ Some((start, end))
+ }
+
+ fn relative_position(&self, index: usize) -> Option<usize> {
+ let start = index.checked_mul(super::INDEX_SIZE)?.checked_add(4)?;
+ let end = start.checked_add(4)?;
+ let position_bytes = self.indexes.get(start..end)?;
+ let absolute_position =
u32::from_le_bytes(position_bytes.try_into().ok()?);
+ Some(absolute_position.checked_sub(self.base_position)? as usize)
+ }
+}
diff --git a/core/common/src/types/message/message_view.rs
b/core/common/src/types/message/message_view.rs
index dd3fb6b1a..d6d42e0b4 100644
--- a/core/common/src/types/message/message_view.rs
+++ b/core/common/src/types/message/message_view.rs
@@ -17,6 +17,7 @@
*/
use super::HeaderValue;
+use super::message_boundaries::IggyMessageBoundaries;
use super::message_header::*;
use crate::BytesSerializable;
use crate::IggyByteSize;
@@ -25,7 +26,8 @@ use crate::error::IggyError;
use crate::utils::checksum;
use crate::{HeaderKey, IggyMessageHeaderView};
use bytes::{Bytes, BytesMut};
-use std::{collections::HashMap, iter::Iterator};
+use std::collections::HashMap;
+use std::num::NonZeroUsize;
/// A immutable view of a message.
#[derive(Debug)]
@@ -160,6 +162,7 @@ impl Sizeable for IggyMessageView<'_> {
pub struct IggyMessageViewIterator<'a> {
buffer: &'a [u8],
position: usize,
+ indexed_last: Option<(usize, NonZeroUsize)>,
}
impl<'a> IggyMessageViewIterator<'a> {
@@ -167,8 +170,27 @@ impl<'a> IggyMessageViewIterator<'a> {
Self {
buffer,
position: 0,
+ indexed_last: None,
}
}
+
+ pub(crate) fn new_with_boundaries(
+ messages: &'a [u8],
+ indexes: &'a [u8],
+ base_position: u32,
+ count: u32,
+ ) -> Self {
+ let mut iter = Self::new(messages);
+ if let Some(boundaries) =
+ IggyMessageBoundaries::new(indexes, messages.len(), base_position,
count)
+ && boundaries.count() > 0
+ {
+ iter.indexed_last = boundaries
+ .boundaries(boundaries.count() - 1)
+ .and_then(|(start, end)| Some((start,
NonZeroUsize::new(end)?)));
+ }
+ iter
+ }
}
impl<'a> Iterator for IggyMessageViewIterator<'a> {
@@ -184,4 +206,68 @@ impl<'a> Iterator for IggyMessageViewIterator<'a> {
self.position += view.size();
Some(view)
}
+
+ fn last(self) -> Option<Self::Item> {
+ if self.position == 0
+ && let Some((start, end)) = self.indexed_last
+ && let Ok(view) =
IggyMessageView::new(&self.buffer[start..end.get()])
+ {
+ return Some(view);
+ }
+
+ let mut last = None;
+ for item in self {
+ last = Some(item);
+ }
+ last
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::IggyMessage;
+ use bytes::Bytes;
+
+ fn build_batch() -> crate::IggyMessagesBatch {
+ let messages = vec![
+ IggyMessage::builder()
+ .payload(Bytes::from_static(b"one"))
+ .build()
+ .unwrap(),
+ IggyMessage::builder()
+ .payload(Bytes::from_static(b"two"))
+ .build()
+ .unwrap(),
+ IggyMessage::builder()
+ .payload(Bytes::from_static(b"three"))
+ .build()
+ .unwrap(),
+ ];
+ crate::IggyMessagesBatch::from(messages)
+ }
+
+ #[test]
+ fn should_return_tail_for_indexed_last_after_next() {
+ let batch = build_batch();
+ let mut iter = IggyMessageViewIterator::new_with_boundaries(
+ batch.buffer(),
+ batch.indexes_slice(),
+ batch.indexes().base_position(),
+ batch.count(),
+ );
+
+ let first = iter.next().unwrap();
+ assert_eq!(first.payload(), b"one");
+
+ let last = iter.last().unwrap();
+ assert_eq!(last.payload(), b"three");
+ }
+
+ #[test]
+ fn should_return_last_message_for_raw_last() {
+ let batch = build_batch();
+ let last =
IggyMessageViewIterator::new(batch.buffer()).last().unwrap();
+ assert_eq!(last.payload(), b"three");
+ }
}
diff --git a/core/common/src/types/message/messages_batch.rs
b/core/common/src/types/message/messages_batch.rs
index aa6910b01..d9b7b171f 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -16,6 +16,7 @@
* under the License.
*/
+use super::message_boundaries::IggyMessageBoundaries;
use crate::{
BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage,
IggyMessageView,
IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable,
error::IggyError,
@@ -52,7 +53,12 @@ impl IggyMessagesBatch {
/// Create iterator over messages
pub fn iter(&self) -> IggyMessageViewIterator<'_> {
- IggyMessageViewIterator::new(&self.messages)
+ IggyMessageViewIterator::new_with_boundaries(
+ &self.messages,
+ &self.indexes,
+ self.indexes.base_position(),
+ self.count,
+ )
}
/// Get the number of messages
@@ -120,47 +126,18 @@ impl IggyMessagesBatch {
self.iter().last().map(|msg| msg.header().timestamp())
}
- /// Calculates the start position of a message at the given index in the
buffer
- fn message_start_position(&self, index: usize) -> usize {
- if index == 0 {
- 0
- } else {
- self.position_at(index as u32 - 1) as usize -
self.indexes.base_position() as usize
- }
- }
-
- /// Calculates the end position of a message at the given index in the
buffer
- fn message_end_position(&self, index: usize) -> usize {
- if index >= self.count as usize - 1 {
- self.messages.len()
- } else {
- self.position_at(index as u32) as usize -
self.indexes.base_position() as usize
- }
+ fn boundaries(&self) -> Option<IggyMessageBoundaries<'_>> {
+ IggyMessageBoundaries::new(
+ &self.indexes,
+ self.messages.len(),
+ self.indexes.base_position(),
+ self.count,
+ )
}
/// Gets the byte range for a message at the given index
fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
- if index >= self.count as usize {
- return None;
- }
-
- let start = self.message_start_position(index);
- let end = self.message_end_position(index);
-
- if start > self.messages.len() || end > self.messages.len() || start >
end {
- return None;
- }
-
- Some((start, end))
- }
-
- /// Helper method to read a position (u32) from the byte array at the
given index
- fn position_at(&self, position_index: u32) -> u32 {
- if let Some(index) = self.indexes.get(position_index) {
- index.position()
- } else {
- 0
- }
+ self.boundaries()?.boundaries(index)
}
/// Get the message at the specified index.
diff --git a/core/common/src/types/message/messages_batch_mut.rs
b/core/common/src/types/message/messages_batch_mut.rs
index 55e33bf27..231ba00a2 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -17,6 +17,7 @@
*/
use super::indexes_mut::IggyIndexesMut;
+use super::message_boundaries::IggyMessageBoundaries;
use super::message_view_mut::IggyMessageViewMutIterator;
use crate::{
BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize,
IggyError,
@@ -100,7 +101,12 @@ impl IggyMessagesBatchMut {
/// Creates an iterator that yields immutable views of messages.
pub fn iter(&self) -> IggyMessageViewIterator<'_> {
- IggyMessageViewIterator::new(&self.messages)
+ IggyMessageViewIterator::new_with_boundaries(
+ &self.messages,
+ &self.indexes,
+ self.indexes.base_position(),
+ self.count(),
+ )
}
/// Returns the number of messages in the batch.
@@ -289,32 +295,23 @@ impl IggyMessagesBatchMut {
self.indexes.get(index).map(|index| index.position())
}
+ fn boundaries(&self) -> Option<IggyMessageBoundaries<'_>> {
+ IggyMessageBoundaries::new(
+ &self.indexes,
+ self.messages.len(),
+ self.indexes.base_position(),
+ self.count(),
+ )
+ }
+
/// Calculates the start position of a message at the given index in the
buffer
fn message_start_position(&self, index: usize) -> Option<usize> {
- if index >= self.count() as usize {
- return None;
- }
-
- if index == 0 {
- Some(0)
- } else {
- self.position_at(index as u32 - 1)
- .map(|pos| (pos - self.indexes.base_position()) as usize)
- }
+ self.get_message_boundaries(index).map(|(start, _)| start)
}
/// Calculates the end position of a message at the given index in the
buffer
fn message_end_position(&self, index: usize) -> Option<usize> {
- if index >= self.count() as usize {
- return None;
- }
-
- if index == self.count() as usize - 1 {
- Some(self.messages.len())
- } else {
- self.position_at(index as u32)
- .map(|pos| (pos - self.indexes.base_position()) as usize)
- }
+ self.get_message_boundaries(index).map(|(_, end)| end)
}
/// Returns a contiguous slice (as a new `IggyMessagesBatch`) of up to
`count` messages
@@ -464,8 +461,7 @@ impl IggyMessagesBatchMut {
/// Gets the byte range for a message at the given index
fn get_message_boundaries(&self, index: usize) -> Option<(usize, usize)> {
- let start = self.message_start_position(index)?;
- let end = self.message_end_position(index)?;
+ let (start, end) = self.boundaries()?.boundaries(index)?;
if start > self.messages.len()
|| end > self.messages.len()
diff --git a/core/common/src/types/message/mod.rs
b/core/common/src/types/message/mod.rs
index d3f85fb63..a20d69fc4 100644
--- a/core/common/src/types/message/mod.rs
+++ b/core/common/src/types/message/mod.rs
@@ -22,6 +22,7 @@ mod index;
mod index_view;
mod indexes;
mod indexes_mut;
+mod message_boundaries;
mod message_header;
mod message_header_view;
mod message_header_view_mut;