This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch storage-infrastructure in repository https://gitbox.apache.org/repos/asf/iggy.git
commit a79e8d4e65be98e71d0acf4d82a9f2a8726617df Author: Hubert Gruszecki <[email protected]> AuthorDate: Wed Jan 14 11:01:20 2026 +0100 feat(server): add shard-local partition storage infrastructure --- core/server/src/shard/mod.rs | 1 + core/server/src/shard/shard_local_partitions.rs | 284 ++++++++++ core/server/src/streaming/mod.rs | 1 + core/server/src/streaming/partition_ops.rs | 671 ++++++++++++++++++++++++ 4 files changed, 957 insertions(+) diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 8939e87b0..941bb392d 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -18,6 +18,7 @@ inner() * or more contributor license agreements. See the NOTICE file pub mod builder; pub mod namespace; +pub mod shard_local_partitions; pub mod system; pub mod task_registry; pub mod tasks; diff --git a/core/server/src/shard/shard_local_partitions.rs b/core/server/src/shard/shard_local_partitions.rs new file mode 100644 index 000000000..6bac44d06 --- /dev/null +++ b/core/server/src/shard/shard_local_partitions.rs @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Per-shard partition data storage. +//! +//! Each shard runs on a single-threaded compio runtime, so no synchronization +//! is needed. Plain HashMap provides maximum performance. + +use crate::streaming::{ + deduplication::message_deduplicator::MessageDeduplicator, + partitions::{ + journal::MemoryMessageJournal, + log::SegmentedLog, + partition::{ConsumerGroupOffsets, ConsumerOffsets}, + }, + stats::PartitionStats, +}; +use iggy_common::IggyTimestamp; +use iggy_common::sharding::IggyNamespace; +use std::{ + collections::HashMap, + sync::{Arc, atomic::AtomicU64}, +}; + +#[derive(Debug)] +pub struct PartitionData { + pub log: SegmentedLog<MemoryMessageJournal>, + pub offset: Arc<AtomicU64>, + pub consumer_offsets: Arc<ConsumerOffsets>, + pub consumer_group_offsets: Arc<ConsumerGroupOffsets>, + pub message_deduplicator: Option<Arc<MessageDeduplicator>>, + pub stats: Arc<PartitionStats>, + pub created_at: IggyTimestamp, + pub revision_id: u64, + pub should_increment_offset: bool, +} + +impl PartitionData { + #[allow(clippy::too_many_arguments)] + pub fn new( + stats: Arc<PartitionStats>, + offset: Arc<AtomicU64>, + consumer_offsets: Arc<ConsumerOffsets>, + consumer_group_offsets: Arc<ConsumerGroupOffsets>, + message_deduplicator: Option<Arc<MessageDeduplicator>>, + created_at: IggyTimestamp, + revision_id: u64, + should_increment_offset: bool, + ) -> Self { + Self { + log: SegmentedLog::default(), + offset, + consumer_offsets, + consumer_group_offsets, + message_deduplicator, + stats, + created_at, + revision_id, + should_increment_offset, + } + } + + #[allow(clippy::too_many_arguments)] + pub fn with_log( + log: SegmentedLog<MemoryMessageJournal>, + stats: Arc<PartitionStats>, + offset: Arc<AtomicU64>, + consumer_offsets: Arc<ConsumerOffsets>, + consumer_group_offsets: Arc<ConsumerGroupOffsets>, + message_deduplicator: Option<Arc<MessageDeduplicator>>, + created_at: IggyTimestamp, + revision_id: u64, + should_increment_offset: bool, + ) -> Self { + Self { + log, + offset, + consumer_offsets, + consumer_group_offsets, + message_deduplicator, + stats, + created_at, + revision_id, + should_increment_offset, + } + } +} + +/// Per-shard partition data storage. +/// Single-threaded (compio runtime) - NO synchronization needed! +#[derive(Debug, Default)] +pub struct ShardLocalPartitions { + partitions: HashMap<IggyNamespace, PartitionData>, +} + +impl ShardLocalPartitions { + pub fn new() -> Self { + Self { + partitions: HashMap::new(), + } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + partitions: HashMap::with_capacity(capacity), + } + } + + #[inline] + pub fn get(&self, ns: &IggyNamespace) -> Option<&PartitionData> { + self.partitions.get(ns) + } + + #[inline] + pub fn get_mut(&mut self, ns: &IggyNamespace) -> Option<&mut PartitionData> { + self.partitions.get_mut(ns) + } + + #[inline] + pub fn insert(&mut self, ns: IggyNamespace, data: PartitionData) { + self.partitions.insert(ns, data); + } + + #[inline] + pub fn remove(&mut self, ns: &IggyNamespace) -> Option<PartitionData> { + self.partitions.remove(ns) + } + + #[inline] + pub fn contains(&self, ns: &IggyNamespace) -> bool { + self.partitions.contains_key(ns) + } + + #[inline] + pub fn len(&self) -> usize { + self.partitions.len() + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.partitions.is_empty() + } + + /// Iterate over all namespaces owned by this shard. + pub fn namespaces(&self) -> impl Iterator<Item = &IggyNamespace> { + self.partitions.keys() + } + + /// Iterate over all partition data. + pub fn iter(&self) -> impl Iterator<Item = (&IggyNamespace, &PartitionData)> { + self.partitions.iter() + } + + /// Iterate over all partition data mutably. + pub fn iter_mut(&mut self) -> impl Iterator<Item = (&IggyNamespace, &mut PartitionData)> { + self.partitions.iter_mut() + } + + /// Remove multiple partitions at once. + pub fn remove_many(&mut self, namespaces: &[IggyNamespace]) -> Vec<PartitionData> { + namespaces + .iter() + .filter_map(|ns| self.partitions.remove(ns)) + .collect() + } + + /// Get partition data, initializing if not present. + /// Returns None if initialization fails. + pub fn get_or_init<F>(&mut self, ns: IggyNamespace, init: F) -> &mut PartitionData + where + F: FnOnce() -> PartitionData, + { + self.partitions.entry(ns).or_insert_with(init) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::streaming::stats::{StreamStats, TopicStats}; + + fn create_test_partition_data() -> PartitionData { + let stream_stats = Arc::new(StreamStats::default()); + let topic_stats = Arc::new(TopicStats::new(stream_stats)); + let partition_stats = Arc::new(PartitionStats::new(topic_stats)); + + PartitionData::new( + partition_stats, + Arc::new(AtomicU64::new(0)), + Arc::new(ConsumerOffsets::with_capacity(10)), + Arc::new(ConsumerGroupOffsets::with_capacity(10)), + None, + IggyTimestamp::now(), + 1, // revision_id + true, + ) + } + + #[test] + fn test_basic_operations() { + let mut store = ShardLocalPartitions::new(); + let ns = IggyNamespace::new(1, 1, 0); + + assert!(!store.contains(&ns)); + assert!(store.is_empty()); + + store.insert(ns, create_test_partition_data()); + + assert!(store.contains(&ns)); + assert_eq!(store.len(), 1); + assert!(store.get(&ns).is_some()); + assert!(store.get_mut(&ns).is_some()); + + let removed = store.remove(&ns); + assert!(removed.is_some()); + assert!(!store.contains(&ns)); + assert!(store.is_empty()); + } + + #[test] + fn test_iteration() { + let mut store = ShardLocalPartitions::new(); + let ns1 = IggyNamespace::new(1, 1, 0); + let ns2 = IggyNamespace::new(1, 1, 1); + let ns3 = IggyNamespace::new(1, 2, 0); + + store.insert(ns1, create_test_partition_data()); + store.insert(ns2, create_test_partition_data()); + store.insert(ns3, create_test_partition_data()); + + let namespaces: Vec<_> = store.namespaces().collect(); + assert_eq!(namespaces.len(), 3); + + let pairs: Vec<_> = store.iter().collect(); + assert_eq!(pairs.len(), 3); + } + + #[test] + fn test_remove_many() { + let mut store = ShardLocalPartitions::new(); + let ns1 = IggyNamespace::new(1, 1, 0); + let ns2 = IggyNamespace::new(1, 1, 1); + let ns3 = IggyNamespace::new(1, 2, 0); + + store.insert(ns1, create_test_partition_data()); + store.insert(ns2, create_test_partition_data()); + store.insert(ns3, create_test_partition_data()); + + let removed = store.remove_many(&[ns1, ns2]); + assert_eq!(removed.len(), 2); + assert!(!store.contains(&ns1)); + assert!(!store.contains(&ns2)); + assert!(store.contains(&ns3)); + } + + #[test] + fn test_get_or_init() { + let mut store = ShardLocalPartitions::new(); + let ns = IggyNamespace::new(1, 1, 0); + + assert!(!store.contains(&ns)); + + let _ = store.get_or_init(ns, create_test_partition_data); + assert!(store.contains(&ns)); + + // Second call should not reinitialize + let data = store.get_or_init(ns, || panic!("Should not be called")); + assert!(data.should_increment_offset); + } +} diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs index a6141bd8b..e213be0bc 100644 --- a/core/server/src/streaming/mod.rs +++ b/core/server/src/streaming/mod.rs @@ -19,6 +19,7 @@ pub mod clients; pub mod deduplication; pub mod diagnostics; +pub mod partition_ops; pub mod partitions; pub mod persistence; pub mod polling_consumer; diff --git a/core/server/src/streaming/partition_ops.rs b/core/server/src/streaming/partition_ops.rs new file mode 100644 index 000000000..6a878b53e --- /dev/null +++ b/core/server/src/streaming/partition_ops.rs @@ -0,0 +1,671 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Shared partition operations that can be used by both production code and tests. +//! +//! This module provides the core logic for polling and loading messages from partitions, +//! avoiding code duplication between `IggyShard` and test harnesses. + +use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata; +use crate::shard::shard_local_partitions::ShardLocalPartitions; +use crate::shard::system::messages::PollingArgs; +use crate::streaming::partitions::journal::Journal; +use crate::streaming::polling_consumer::PollingConsumer; +use crate::streaming::segments::IggyMessagesBatchSet; +use iggy_common::sharding::IggyNamespace; +use iggy_common::{IggyError, PollingKind}; +use std::cell::RefCell; +use std::sync::atomic::Ordering; + +/// Poll messages from a partition store. +/// +/// This is the core polling logic shared between production code and tests. +pub async fn poll_messages( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + consumer: PollingConsumer, + args: PollingArgs, +) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { + let partition_id = namespace.partition_id(); + let count = args.count; + let strategy = args.strategy; + let value = strategy.value; + + // Handle timestamp polling separately - it has different logic + if strategy.kind == PollingKind::Timestamp { + return poll_messages_by_timestamp(partition_store, namespace, value, count).await; + } + + // Phase 1: Extract metadata and determine start offset + let (metadata, start_offset) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + + let current_offset = partition_data.offset.load(Ordering::Relaxed); + let metadata = IggyPollMetadata::new(partition_id as u32, current_offset); + + let start_offset = match strategy.kind { + PollingKind::Offset => { + let offset = value; + if offset > current_offset { + return Ok((metadata, IggyMessagesBatchSet::empty())); + } + offset + } + PollingKind::First => partition_data + .log + .segments() + .first() + .map(|segment| segment.start_offset) + .unwrap_or(0), + PollingKind::Last => { + let mut requested_count = count as u64; + if requested_count > current_offset + 1 { + requested_count = current_offset + 1; + } + 1 + current_offset - requested_count + } + PollingKind::Next => { + let stored_offset = match consumer { + PollingConsumer::Consumer(id, _) => partition_data + .consumer_offsets + .pin() + .get(&id) + .map(|item| item.offset.load(Ordering::Relaxed)), + PollingConsumer::ConsumerGroup(cg_id, _) => partition_data + .consumer_group_offsets + .pin() + .get(&cg_id) + .map(|item| item.offset.load(Ordering::Relaxed)), + }; + match stored_offset { + Some(offset) => offset + 1, + None => partition_data + .log + .segments() + .first() + .map(|segment| segment.start_offset) + .unwrap_or(0), + } + } + PollingKind::Timestamp => unreachable!("Timestamp handled above"), + }; + + if start_offset > current_offset || count == 0 { + return Ok((metadata, IggyMessagesBatchSet::empty())); + } + + (metadata, start_offset) + }; + + // Phase 2: Get messages using hybrid disk+journal logic + let batches = get_messages_by_offset(partition_store, namespace, start_offset, count).await?; + Ok((metadata, batches)) +} + +/// Get messages by offset, handling the hybrid disk+journal case. +pub async fn get_messages_by_offset( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + start_offset: u64, + count: u32, +) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::empty()); + } + + // Get journal metadata for routing + let (is_journal_empty, journal_first_offset, journal_last_offset) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + + let journal = partition_data.log.journal(); + let journal_inner = journal.inner(); + ( + journal.is_empty(), + journal_inner.base_offset, + journal_inner.current_offset, + ) + }; + + let end_offset = start_offset + (count - 1).max(1) as u64; + + // Case 0: Journal is empty, all messages on disk + if is_journal_empty { + return load_messages_from_disk(partition_store, namespace, start_offset, count).await; + } + + // Case 1: All messages are in journal + if start_offset >= journal_first_offset && end_offset <= journal_last_offset { + let batches = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + partition_data + .log + .journal() + .get(|batches| batches.get_by_offset(start_offset, count)) + }; + return Ok(batches); + } + + // Case 2: All messages on disk (end_offset < journal_first_offset) + if end_offset < journal_first_offset { + return load_messages_from_disk(partition_store, namespace, start_offset, count).await; + } + + // Case 3: Messages span disk and journal boundary + let disk_count = if start_offset < journal_first_offset { + ((journal_first_offset - start_offset) as u32).min(count) + } else { + 0 + }; + + let mut combined_batch_set = IggyMessagesBatchSet::empty(); + + // Load messages from disk if needed + if disk_count > 0 { + let disk_messages = + load_messages_from_disk(partition_store, namespace, start_offset, disk_count).await?; + if !disk_messages.is_empty() { + combined_batch_set.add_batch_set(disk_messages); + } + } + + // Get remaining messages from journal + let remaining_count = count - combined_batch_set.count(); + if remaining_count > 0 { + let journal_start_offset = std::cmp::max(start_offset, journal_first_offset); + let journal_messages = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + partition_data + .log + .journal() + .get(|batches| batches.get_by_offset(journal_start_offset, remaining_count)) + }; + if !journal_messages.is_empty() { + combined_batch_set.add_batch_set(journal_messages); + } + } + + Ok(combined_batch_set) +} + +/// Poll messages by timestamp. +async fn poll_messages_by_timestamp( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + timestamp: u64, + count: u32, +) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> { + let partition_id = namespace.partition_id(); + + // Get metadata and journal info + let (metadata, is_journal_empty, journal_first_timestamp, journal_last_timestamp) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + + let current_offset = partition_data.offset.load(Ordering::Relaxed); + let metadata = IggyPollMetadata::new(partition_id as u32, current_offset); + + let journal = partition_data.log.journal(); + let journal_inner = journal.inner(); + ( + metadata, + journal.is_empty(), + journal_inner.first_timestamp, + journal_inner.end_timestamp, + ) + }; + + if count == 0 { + return Ok((metadata, IggyMessagesBatchSet::empty())); + } + + // Case 0: Journal is empty, all messages on disk + if is_journal_empty { + let batches = + load_messages_from_disk_by_timestamp(partition_store, namespace, timestamp, count) + .await?; + return Ok((metadata, batches)); + } + + // Case 1: Timestamp is after journal's last timestamp - no messages + if timestamp > journal_last_timestamp { + return Ok((metadata, IggyMessagesBatchSet::empty())); + } + + // Case 2: Timestamp is within journal range - get from journal + if timestamp >= journal_first_timestamp { + let batches = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + partition_data + .log + .journal() + .get(|batches| batches.get_by_timestamp(timestamp, count)) + }; + return Ok((metadata, batches)); + } + + // Case 3: Timestamp is before journal - need disk + possibly journal + let disk_messages = + load_messages_from_disk_by_timestamp(partition_store, namespace, timestamp, count).await?; + + if disk_messages.count() >= count { + return Ok((metadata, disk_messages)); + } + + // Case 4: Messages span disk and journal + let remaining_count = count - disk_messages.count(); + let journal_messages = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist for poll"); + partition_data + .log + .journal() + .get(|batches| batches.get_by_timestamp(timestamp, remaining_count)) + }; + + let mut combined_batch_set = disk_messages; + if !journal_messages.is_empty() { + combined_batch_set.add_batch_set(journal_messages); + } + Ok((metadata, combined_batch_set)) +} + +/// Load messages from disk by offset. +pub async fn load_messages_from_disk( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + start_offset: u64, + count: u32, +) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::empty()); + } + + // Get segment range containing the requested offset + let segment_range = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let segments = partition_data.log.segments(); + if segments.is_empty() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let start = segments + .iter() + .rposition(|segment| segment.start_offset <= start_offset) + .unwrap_or(0); + let end = segments.len(); + start..end + }; + + let mut remaining_count = count; + let mut batches = IggyMessagesBatchSet::empty(); + let mut current_offset = start_offset; + + for idx in segment_range { + if remaining_count == 0 { + break; + } + + let (segment_start_offset, segment_end_offset) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let segment = &partition_data.log.segments()[idx]; + (segment.start_offset, segment.end_offset) + }; + + let offset = if current_offset < segment_start_offset { + segment_start_offset + } else { + current_offset + }; + + let mut end_offset = offset + (remaining_count - 1).max(1) as u64; + if end_offset > segment_end_offset { + end_offset = segment_end_offset; + } + + let messages = load_segment_messages( + partition_store, + namespace, + idx, + offset, + end_offset, + remaining_count, + segment_start_offset, + ) + .await?; + + let loaded_count = messages.count(); + if loaded_count > 0 { + batches.add_batch_set(messages); + remaining_count = remaining_count.saturating_sub(loaded_count); + current_offset = end_offset + 1; + } else { + break; + } + } + + Ok(batches) +} + +/// Load messages from a specific segment. +async fn load_segment_messages( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + idx: usize, + start_offset: u64, + end_offset: u64, + count: u32, + segment_start_offset: u64, +) -> Result<IggyMessagesBatchSet, IggyError> { + let relative_start_offset = (start_offset - segment_start_offset) as u32; + + // Check journal first for this segment's data + let journal_data = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let journal = partition_data.log.journal(); + let is_journal_empty = journal.is_empty(); + let journal_inner = journal.inner(); + let journal_first_offset = journal_inner.base_offset; + let journal_last_offset = journal_inner.current_offset; + + if !is_journal_empty + && start_offset >= journal_first_offset + && end_offset <= journal_last_offset + { + Some(journal.get(|batches| batches.get_by_offset(start_offset, count))) + } else { + None + } + }; + + if let Some(batches) = journal_data { + return Ok(batches); + } + + // Load from disk + let (index_reader, messages_reader, indexes) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let storages = partition_data.log.storages(); + if idx >= storages.len() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let index_reader = storages[idx] + .index_reader + .as_ref() + .expect("Index reader not initialized") + .clone(); + let messages_reader = storages[idx] + .messages_reader + .as_ref() + .expect("Messages reader not initialized") + .clone(); + let indexes_vec = partition_data.log.indexes(); + let indexes = indexes_vec + .get(idx) + .and_then(|opt| opt.as_ref()) + .map(|indexes| { + indexes + .slice_by_offset(relative_start_offset, count) + .unwrap_or_default() + }); + (index_reader, messages_reader, indexes) + }; + + let indexes_to_read = if let Some(indexes) = indexes { + if !indexes.is_empty() { + Some(indexes) + } else { + index_reader + .as_ref() + .load_from_disk_by_offset(relative_start_offset, count) + .await? + } + } else { + index_reader + .as_ref() + .load_from_disk_by_offset(relative_start_offset, count) + .await? + }; + + if indexes_to_read.is_none() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let indexes_to_read = indexes_to_read.unwrap(); + let batch = messages_reader + .as_ref() + .load_messages_from_disk(indexes_to_read) + .await?; + + batch.validate_checksums_and_offsets(start_offset)?; + + Ok(IggyMessagesBatchSet::from(batch)) +} + +/// Load messages from disk by timestamp. +async fn load_messages_from_disk_by_timestamp( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + timestamp: u64, + count: u32, +) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::empty()); + } + + // Find segment range that might contain messages >= timestamp + let segment_range = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let segments = partition_data.log.segments(); + if segments.is_empty() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let start = segments + .iter() + .position(|segment| segment.end_timestamp >= timestamp) + .unwrap_or(segments.len()); + + if start >= segments.len() { + return Ok(IggyMessagesBatchSet::empty()); + } + + start..segments.len() + }; + + let mut remaining_count = count; + let mut batches = IggyMessagesBatchSet::empty(); + + for idx in segment_range { + if remaining_count == 0 { + break; + } + + let segment_end_timestamp = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + partition_data.log.segments()[idx].end_timestamp + }; + + if segment_end_timestamp < timestamp { + continue; + } + + let messages = load_segment_messages_by_timestamp( + partition_store, + namespace, + idx, + timestamp, + remaining_count, + ) + .await?; + + let messages_count = messages.count(); + if messages_count == 0 { + continue; + } + + remaining_count = remaining_count.saturating_sub(messages_count); + batches.add_batch_set(messages); + } + + Ok(batches) +} + +/// Load messages from a specific segment by timestamp. +async fn load_segment_messages_by_timestamp( + partition_store: &RefCell<ShardLocalPartitions>, + namespace: &IggyNamespace, + idx: usize, + timestamp: u64, + count: u32, +) -> Result<IggyMessagesBatchSet, IggyError> { + if count == 0 { + return Ok(IggyMessagesBatchSet::empty()); + } + + // Check journal first + let journal_data = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let journal = partition_data.log.journal(); + let is_journal_empty = journal.is_empty(); + let journal_inner = journal.inner(); + let journal_first_timestamp = journal_inner.first_timestamp; + let journal_last_timestamp = journal_inner.end_timestamp; + + if !is_journal_empty + && timestamp >= journal_first_timestamp + && timestamp <= journal_last_timestamp + { + Some(journal.get(|batches| batches.get_by_timestamp(timestamp, count))) + } else { + None + } + }; + + if let Some(batches) = journal_data { + return Ok(batches); + } + + // Load from disk + let (index_reader, messages_reader, indexes) = { + let store = partition_store.borrow(); + let partition_data = store + .get(namespace) + .expect("partition_store: partition must exist"); + + let storages = partition_data.log.storages(); + if idx >= storages.len() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let index_reader = storages[idx] + .index_reader + .as_ref() + .expect("Index reader not initialized") + .clone(); + let messages_reader = storages[idx] + .messages_reader + .as_ref() + .expect("Messages reader not initialized") + .clone(); + let indexes_vec = partition_data.log.indexes(); + let indexes = indexes_vec + .get(idx) + .and_then(|opt| opt.as_ref()) + .map(|indexes| { + indexes + .slice_by_timestamp(timestamp, count) + .unwrap_or_default() + }); + (index_reader, messages_reader, indexes) + }; + + let indexes_to_read = if let Some(indexes) = indexes { + if !indexes.is_empty() { + Some(indexes) + } else { + index_reader + .as_ref() + .load_from_disk_by_timestamp(timestamp, count) + .await? + } + } else { + index_reader + .as_ref() + .load_from_disk_by_timestamp(timestamp, count) + .await? + }; + + if indexes_to_read.is_none() { + return Ok(IggyMessagesBatchSet::empty()); + } + + let indexes_to_read = indexes_to_read.unwrap(); + let batch = messages_reader + .as_ref() + .load_messages_from_disk(indexes_to_read) + .await?; + + Ok(IggyMessagesBatchSet::from(batch)) +}
