This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch storage-infrastructure
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit a79e8d4e65be98e71d0acf4d82a9f2a8726617df
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Wed Jan 14 11:01:20 2026 +0100

    feat(server): add shard-local partition storage infrastructure
---
 core/server/src/shard/mod.rs                    |   1 +
 core/server/src/shard/shard_local_partitions.rs | 284 ++++++++++
 core/server/src/streaming/mod.rs                |   1 +
 core/server/src/streaming/partition_ops.rs      | 671 ++++++++++++++++++++++++
 4 files changed, 957 insertions(+)

diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index 8939e87b0..941bb392d 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -18,6 +18,7 @@ inner() * or more contributor license agreements.  See the 
NOTICE file
 
 pub mod builder;
 pub mod namespace;
+pub mod shard_local_partitions;
 pub mod system;
 pub mod task_registry;
 pub mod tasks;
diff --git a/core/server/src/shard/shard_local_partitions.rs 
b/core/server/src/shard/shard_local_partitions.rs
new file mode 100644
index 000000000..6bac44d06
--- /dev/null
+++ b/core/server/src/shard/shard_local_partitions.rs
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Per-shard partition data storage.
+//!
+//! Each shard runs on a single-threaded compio runtime, so no synchronization
+//! is needed. Plain HashMap provides maximum performance.
+
+use crate::streaming::{
+    deduplication::message_deduplicator::MessageDeduplicator,
+    partitions::{
+        journal::MemoryMessageJournal,
+        log::SegmentedLog,
+        partition::{ConsumerGroupOffsets, ConsumerOffsets},
+    },
+    stats::PartitionStats,
+};
+use iggy_common::IggyTimestamp;
+use iggy_common::sharding::IggyNamespace;
+use std::{
+    collections::HashMap,
+    sync::{Arc, atomic::AtomicU64},
+};
+
+#[derive(Debug)]
+pub struct PartitionData {
+    pub log: SegmentedLog<MemoryMessageJournal>,
+    pub offset: Arc<AtomicU64>,
+    pub consumer_offsets: Arc<ConsumerOffsets>,
+    pub consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+    pub message_deduplicator: Option<Arc<MessageDeduplicator>>,
+    pub stats: Arc<PartitionStats>,
+    pub created_at: IggyTimestamp,
+    pub revision_id: u64,
+    pub should_increment_offset: bool,
+}
+
+impl PartitionData {
+    #[allow(clippy::too_many_arguments)]
+    pub fn new(
+        stats: Arc<PartitionStats>,
+        offset: Arc<AtomicU64>,
+        consumer_offsets: Arc<ConsumerOffsets>,
+        consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+        message_deduplicator: Option<Arc<MessageDeduplicator>>,
+        created_at: IggyTimestamp,
+        revision_id: u64,
+        should_increment_offset: bool,
+    ) -> Self {
+        Self {
+            log: SegmentedLog::default(),
+            offset,
+            consumer_offsets,
+            consumer_group_offsets,
+            message_deduplicator,
+            stats,
+            created_at,
+            revision_id,
+            should_increment_offset,
+        }
+    }
+
+    #[allow(clippy::too_many_arguments)]
+    pub fn with_log(
+        log: SegmentedLog<MemoryMessageJournal>,
+        stats: Arc<PartitionStats>,
+        offset: Arc<AtomicU64>,
+        consumer_offsets: Arc<ConsumerOffsets>,
+        consumer_group_offsets: Arc<ConsumerGroupOffsets>,
+        message_deduplicator: Option<Arc<MessageDeduplicator>>,
+        created_at: IggyTimestamp,
+        revision_id: u64,
+        should_increment_offset: bool,
+    ) -> Self {
+        Self {
+            log,
+            offset,
+            consumer_offsets,
+            consumer_group_offsets,
+            message_deduplicator,
+            stats,
+            created_at,
+            revision_id,
+            should_increment_offset,
+        }
+    }
+}
+
+/// Per-shard partition data storage.
+/// Single-threaded (compio runtime) - NO synchronization needed!
+#[derive(Debug, Default)]
+pub struct ShardLocalPartitions {
+    partitions: HashMap<IggyNamespace, PartitionData>,
+}
+
+impl ShardLocalPartitions {
+    pub fn new() -> Self {
+        Self {
+            partitions: HashMap::new(),
+        }
+    }
+
+    pub fn with_capacity(capacity: usize) -> Self {
+        Self {
+            partitions: HashMap::with_capacity(capacity),
+        }
+    }
+
+    #[inline]
+    pub fn get(&self, ns: &IggyNamespace) -> Option<&PartitionData> {
+        self.partitions.get(ns)
+    }
+
+    #[inline]
+    pub fn get_mut(&mut self, ns: &IggyNamespace) -> Option<&mut 
PartitionData> {
+        self.partitions.get_mut(ns)
+    }
+
+    #[inline]
+    pub fn insert(&mut self, ns: IggyNamespace, data: PartitionData) {
+        self.partitions.insert(ns, data);
+    }
+
+    #[inline]
+    pub fn remove(&mut self, ns: &IggyNamespace) -> Option<PartitionData> {
+        self.partitions.remove(ns)
+    }
+
+    #[inline]
+    pub fn contains(&self, ns: &IggyNamespace) -> bool {
+        self.partitions.contains_key(ns)
+    }
+
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.partitions.len()
+    }
+
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.partitions.is_empty()
+    }
+
+    /// Iterate over all namespaces owned by this shard.
+    pub fn namespaces(&self) -> impl Iterator<Item = &IggyNamespace> {
+        self.partitions.keys()
+    }
+
+    /// Iterate over all partition data.
+    pub fn iter(&self) -> impl Iterator<Item = (&IggyNamespace, 
&PartitionData)> {
+        self.partitions.iter()
+    }
+
+    /// Iterate over all partition data mutably.
+    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&IggyNamespace, &mut 
PartitionData)> {
+        self.partitions.iter_mut()
+    }
+
+    /// Remove multiple partitions at once.
+    pub fn remove_many(&mut self, namespaces: &[IggyNamespace]) -> 
Vec<PartitionData> {
+        namespaces
+            .iter()
+            .filter_map(|ns| self.partitions.remove(ns))
+            .collect()
+    }
+
+    /// Get partition data, initializing if not present.
+    /// Returns None if initialization fails.
+    pub fn get_or_init<F>(&mut self, ns: IggyNamespace, init: F) -> &mut 
PartitionData
+    where
+        F: FnOnce() -> PartitionData,
+    {
+        self.partitions.entry(ns).or_insert_with(init)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::streaming::stats::{StreamStats, TopicStats};
+
+    fn create_test_partition_data() -> PartitionData {
+        let stream_stats = Arc::new(StreamStats::default());
+        let topic_stats = Arc::new(TopicStats::new(stream_stats));
+        let partition_stats = Arc::new(PartitionStats::new(topic_stats));
+
+        PartitionData::new(
+            partition_stats,
+            Arc::new(AtomicU64::new(0)),
+            Arc::new(ConsumerOffsets::with_capacity(10)),
+            Arc::new(ConsumerGroupOffsets::with_capacity(10)),
+            None,
+            IggyTimestamp::now(),
+            1, // revision_id
+            true,
+        )
+    }
+
+    #[test]
+    fn test_basic_operations() {
+        let mut store = ShardLocalPartitions::new();
+        let ns = IggyNamespace::new(1, 1, 0);
+
+        assert!(!store.contains(&ns));
+        assert!(store.is_empty());
+
+        store.insert(ns, create_test_partition_data());
+
+        assert!(store.contains(&ns));
+        assert_eq!(store.len(), 1);
+        assert!(store.get(&ns).is_some());
+        assert!(store.get_mut(&ns).is_some());
+
+        let removed = store.remove(&ns);
+        assert!(removed.is_some());
+        assert!(!store.contains(&ns));
+        assert!(store.is_empty());
+    }
+
+    #[test]
+    fn test_iteration() {
+        let mut store = ShardLocalPartitions::new();
+        let ns1 = IggyNamespace::new(1, 1, 0);
+        let ns2 = IggyNamespace::new(1, 1, 1);
+        let ns3 = IggyNamespace::new(1, 2, 0);
+
+        store.insert(ns1, create_test_partition_data());
+        store.insert(ns2, create_test_partition_data());
+        store.insert(ns3, create_test_partition_data());
+
+        let namespaces: Vec<_> = store.namespaces().collect();
+        assert_eq!(namespaces.len(), 3);
+
+        let pairs: Vec<_> = store.iter().collect();
+        assert_eq!(pairs.len(), 3);
+    }
+
+    #[test]
+    fn test_remove_many() {
+        let mut store = ShardLocalPartitions::new();
+        let ns1 = IggyNamespace::new(1, 1, 0);
+        let ns2 = IggyNamespace::new(1, 1, 1);
+        let ns3 = IggyNamespace::new(1, 2, 0);
+
+        store.insert(ns1, create_test_partition_data());
+        store.insert(ns2, create_test_partition_data());
+        store.insert(ns3, create_test_partition_data());
+
+        let removed = store.remove_many(&[ns1, ns2]);
+        assert_eq!(removed.len(), 2);
+        assert!(!store.contains(&ns1));
+        assert!(!store.contains(&ns2));
+        assert!(store.contains(&ns3));
+    }
+
+    #[test]
+    fn test_get_or_init() {
+        let mut store = ShardLocalPartitions::new();
+        let ns = IggyNamespace::new(1, 1, 0);
+
+        assert!(!store.contains(&ns));
+
+        let _ = store.get_or_init(ns, create_test_partition_data);
+        assert!(store.contains(&ns));
+
+        // Second call should not reinitialize
+        let data = store.get_or_init(ns, || panic!("Should not be called"));
+        assert!(data.should_increment_offset);
+    }
+}
diff --git a/core/server/src/streaming/mod.rs b/core/server/src/streaming/mod.rs
index a6141bd8b..e213be0bc 100644
--- a/core/server/src/streaming/mod.rs
+++ b/core/server/src/streaming/mod.rs
@@ -19,6 +19,7 @@
 pub mod clients;
 pub mod deduplication;
 pub mod diagnostics;
+pub mod partition_ops;
 pub mod partitions;
 pub mod persistence;
 pub mod polling_consumer;
diff --git a/core/server/src/streaming/partition_ops.rs 
b/core/server/src/streaming/partition_ops.rs
new file mode 100644
index 000000000..6a878b53e
--- /dev/null
+++ b/core/server/src/streaming/partition_ops.rs
@@ -0,0 +1,671 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Shared partition operations that can be used by both production code and 
tests.
+//!
+//! This module provides the core logic for polling and loading messages from 
partitions,
+//! avoiding code duplication between `IggyShard` and test harnesses.
+
+use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
+use crate::shard::shard_local_partitions::ShardLocalPartitions;
+use crate::shard::system::messages::PollingArgs;
+use crate::streaming::partitions::journal::Journal;
+use crate::streaming::polling_consumer::PollingConsumer;
+use crate::streaming::segments::IggyMessagesBatchSet;
+use iggy_common::sharding::IggyNamespace;
+use iggy_common::{IggyError, PollingKind};
+use std::cell::RefCell;
+use std::sync::atomic::Ordering;
+
+/// Poll messages from a partition store.
+///
+/// This is the core polling logic shared between production code and tests.
+pub async fn poll_messages(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    consumer: PollingConsumer,
+    args: PollingArgs,
+) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+    let partition_id = namespace.partition_id();
+    let count = args.count;
+    let strategy = args.strategy;
+    let value = strategy.value;
+
+    // Handle timestamp polling separately - it has different logic
+    if strategy.kind == PollingKind::Timestamp {
+        return poll_messages_by_timestamp(partition_store, namespace, value, 
count).await;
+    }
+
+    // Phase 1: Extract metadata and determine start offset
+    let (metadata, start_offset) = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist for poll");
+
+        let current_offset = partition_data.offset.load(Ordering::Relaxed);
+        let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
+
+        let start_offset = match strategy.kind {
+            PollingKind::Offset => {
+                let offset = value;
+                if offset > current_offset {
+                    return Ok((metadata, IggyMessagesBatchSet::empty()));
+                }
+                offset
+            }
+            PollingKind::First => partition_data
+                .log
+                .segments()
+                .first()
+                .map(|segment| segment.start_offset)
+                .unwrap_or(0),
+            PollingKind::Last => {
+                let mut requested_count = count as u64;
+                if requested_count > current_offset + 1 {
+                    requested_count = current_offset + 1;
+                }
+                1 + current_offset - requested_count
+            }
+            PollingKind::Next => {
+                let stored_offset = match consumer {
+                    PollingConsumer::Consumer(id, _) => partition_data
+                        .consumer_offsets
+                        .pin()
+                        .get(&id)
+                        .map(|item| item.offset.load(Ordering::Relaxed)),
+                    PollingConsumer::ConsumerGroup(cg_id, _) => partition_data
+                        .consumer_group_offsets
+                        .pin()
+                        .get(&cg_id)
+                        .map(|item| item.offset.load(Ordering::Relaxed)),
+                };
+                match stored_offset {
+                    Some(offset) => offset + 1,
+                    None => partition_data
+                        .log
+                        .segments()
+                        .first()
+                        .map(|segment| segment.start_offset)
+                        .unwrap_or(0),
+                }
+            }
+            PollingKind::Timestamp => unreachable!("Timestamp handled above"),
+        };
+
+        if start_offset > current_offset || count == 0 {
+            return Ok((metadata, IggyMessagesBatchSet::empty()));
+        }
+
+        (metadata, start_offset)
+    };
+
+    // Phase 2: Get messages using hybrid disk+journal logic
+    let batches = get_messages_by_offset(partition_store, namespace, 
start_offset, count).await?;
+    Ok((metadata, batches))
+}
+
+/// Get messages by offset, handling the hybrid disk+journal case.
+pub async fn get_messages_by_offset(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    start_offset: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    // Get journal metadata for routing
+    let (is_journal_empty, journal_first_offset, journal_last_offset) = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist for poll");
+
+        let journal = partition_data.log.journal();
+        let journal_inner = journal.inner();
+        (
+            journal.is_empty(),
+            journal_inner.base_offset,
+            journal_inner.current_offset,
+        )
+    };
+
+    let end_offset = start_offset + (count - 1).max(1) as u64;
+
+    // Case 0: Journal is empty, all messages on disk
+    if is_journal_empty {
+        return load_messages_from_disk(partition_store, namespace, 
start_offset, count).await;
+    }
+
+    // Case 1: All messages are in journal
+    if start_offset >= journal_first_offset && end_offset <= 
journal_last_offset {
+        let batches = {
+            let store = partition_store.borrow();
+            let partition_data = store
+                .get(namespace)
+                .expect("partition_store: partition must exist for poll");
+            partition_data
+                .log
+                .journal()
+                .get(|batches| batches.get_by_offset(start_offset, count))
+        };
+        return Ok(batches);
+    }
+
+    // Case 2: All messages on disk (end_offset < journal_first_offset)
+    if end_offset < journal_first_offset {
+        return load_messages_from_disk(partition_store, namespace, 
start_offset, count).await;
+    }
+
+    // Case 3: Messages span disk and journal boundary
+    let disk_count = if start_offset < journal_first_offset {
+        ((journal_first_offset - start_offset) as u32).min(count)
+    } else {
+        0
+    };
+
+    let mut combined_batch_set = IggyMessagesBatchSet::empty();
+
+    // Load messages from disk if needed
+    if disk_count > 0 {
+        let disk_messages =
+            load_messages_from_disk(partition_store, namespace, start_offset, 
disk_count).await?;
+        if !disk_messages.is_empty() {
+            combined_batch_set.add_batch_set(disk_messages);
+        }
+    }
+
+    // Get remaining messages from journal
+    let remaining_count = count - combined_batch_set.count();
+    if remaining_count > 0 {
+        let journal_start_offset = std::cmp::max(start_offset, 
journal_first_offset);
+        let journal_messages = {
+            let store = partition_store.borrow();
+            let partition_data = store
+                .get(namespace)
+                .expect("partition_store: partition must exist for poll");
+            partition_data
+                .log
+                .journal()
+                .get(|batches| batches.get_by_offset(journal_start_offset, 
remaining_count))
+        };
+        if !journal_messages.is_empty() {
+            combined_batch_set.add_batch_set(journal_messages);
+        }
+    }
+
+    Ok(combined_batch_set)
+}
+
+/// Poll messages by timestamp.
+async fn poll_messages_by_timestamp(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    timestamp: u64,
+    count: u32,
+) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
+    let partition_id = namespace.partition_id();
+
+    // Get metadata and journal info
+    let (metadata, is_journal_empty, journal_first_timestamp, 
journal_last_timestamp) = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist for poll");
+
+        let current_offset = partition_data.offset.load(Ordering::Relaxed);
+        let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
+
+        let journal = partition_data.log.journal();
+        let journal_inner = journal.inner();
+        (
+            metadata,
+            journal.is_empty(),
+            journal_inner.first_timestamp,
+            journal_inner.end_timestamp,
+        )
+    };
+
+    if count == 0 {
+        return Ok((metadata, IggyMessagesBatchSet::empty()));
+    }
+
+    // Case 0: Journal is empty, all messages on disk
+    if is_journal_empty {
+        let batches =
+            load_messages_from_disk_by_timestamp(partition_store, namespace, 
timestamp, count)
+                .await?;
+        return Ok((metadata, batches));
+    }
+
+    // Case 1: Timestamp is after journal's last timestamp - no messages
+    if timestamp > journal_last_timestamp {
+        return Ok((metadata, IggyMessagesBatchSet::empty()));
+    }
+
+    // Case 2: Timestamp is within journal range - get from journal
+    if timestamp >= journal_first_timestamp {
+        let batches = {
+            let store = partition_store.borrow();
+            let partition_data = store
+                .get(namespace)
+                .expect("partition_store: partition must exist for poll");
+            partition_data
+                .log
+                .journal()
+                .get(|batches| batches.get_by_timestamp(timestamp, count))
+        };
+        return Ok((metadata, batches));
+    }
+
+    // Case 3: Timestamp is before journal - need disk + possibly journal
+    let disk_messages =
+        load_messages_from_disk_by_timestamp(partition_store, namespace, 
timestamp, count).await?;
+
+    if disk_messages.count() >= count {
+        return Ok((metadata, disk_messages));
+    }
+
+    // Case 4: Messages span disk and journal
+    let remaining_count = count - disk_messages.count();
+    let journal_messages = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist for poll");
+        partition_data
+            .log
+            .journal()
+            .get(|batches| batches.get_by_timestamp(timestamp, 
remaining_count))
+    };
+
+    let mut combined_batch_set = disk_messages;
+    if !journal_messages.is_empty() {
+        combined_batch_set.add_batch_set(journal_messages);
+    }
+    Ok((metadata, combined_batch_set))
+}
+
+/// Load messages from disk by offset.
+pub async fn load_messages_from_disk(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    start_offset: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    // Get segment range containing the requested offset
+    let segment_range = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let segments = partition_data.log.segments();
+        if segments.is_empty() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let start = segments
+            .iter()
+            .rposition(|segment| segment.start_offset <= start_offset)
+            .unwrap_or(0);
+        let end = segments.len();
+        start..end
+    };
+
+    let mut remaining_count = count;
+    let mut batches = IggyMessagesBatchSet::empty();
+    let mut current_offset = start_offset;
+
+    for idx in segment_range {
+        if remaining_count == 0 {
+            break;
+        }
+
+        let (segment_start_offset, segment_end_offset) = {
+            let store = partition_store.borrow();
+            let partition_data = store
+                .get(namespace)
+                .expect("partition_store: partition must exist");
+
+            let segment = &partition_data.log.segments()[idx];
+            (segment.start_offset, segment.end_offset)
+        };
+
+        let offset = if current_offset < segment_start_offset {
+            segment_start_offset
+        } else {
+            current_offset
+        };
+
+        let mut end_offset = offset + (remaining_count - 1).max(1) as u64;
+        if end_offset > segment_end_offset {
+            end_offset = segment_end_offset;
+        }
+
+        let messages = load_segment_messages(
+            partition_store,
+            namespace,
+            idx,
+            offset,
+            end_offset,
+            remaining_count,
+            segment_start_offset,
+        )
+        .await?;
+
+        let loaded_count = messages.count();
+        if loaded_count > 0 {
+            batches.add_batch_set(messages);
+            remaining_count = remaining_count.saturating_sub(loaded_count);
+            current_offset = end_offset + 1;
+        } else {
+            break;
+        }
+    }
+
+    Ok(batches)
+}
+
+/// Load messages from a specific segment.
+async fn load_segment_messages(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    idx: usize,
+    start_offset: u64,
+    end_offset: u64,
+    count: u32,
+    segment_start_offset: u64,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    let relative_start_offset = (start_offset - segment_start_offset) as u32;
+
+    // Check journal first for this segment's data
+    let journal_data = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let journal = partition_data.log.journal();
+        let is_journal_empty = journal.is_empty();
+        let journal_inner = journal.inner();
+        let journal_first_offset = journal_inner.base_offset;
+        let journal_last_offset = journal_inner.current_offset;
+
+        if !is_journal_empty
+            && start_offset >= journal_first_offset
+            && end_offset <= journal_last_offset
+        {
+            Some(journal.get(|batches| batches.get_by_offset(start_offset, 
count)))
+        } else {
+            None
+        }
+    };
+
+    if let Some(batches) = journal_data {
+        return Ok(batches);
+    }
+
+    // Load from disk
+    let (index_reader, messages_reader, indexes) = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let storages = partition_data.log.storages();
+        if idx >= storages.len() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let index_reader = storages[idx]
+            .index_reader
+            .as_ref()
+            .expect("Index reader not initialized")
+            .clone();
+        let messages_reader = storages[idx]
+            .messages_reader
+            .as_ref()
+            .expect("Messages reader not initialized")
+            .clone();
+        let indexes_vec = partition_data.log.indexes();
+        let indexes = indexes_vec
+            .get(idx)
+            .and_then(|opt| opt.as_ref())
+            .map(|indexes| {
+                indexes
+                    .slice_by_offset(relative_start_offset, count)
+                    .unwrap_or_default()
+            });
+        (index_reader, messages_reader, indexes)
+    };
+
+    let indexes_to_read = if let Some(indexes) = indexes {
+        if !indexes.is_empty() {
+            Some(indexes)
+        } else {
+            index_reader
+                .as_ref()
+                .load_from_disk_by_offset(relative_start_offset, count)
+                .await?
+        }
+    } else {
+        index_reader
+            .as_ref()
+            .load_from_disk_by_offset(relative_start_offset, count)
+            .await?
+    };
+
+    if indexes_to_read.is_none() {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    let indexes_to_read = indexes_to_read.unwrap();
+    let batch = messages_reader
+        .as_ref()
+        .load_messages_from_disk(indexes_to_read)
+        .await?;
+
+    batch.validate_checksums_and_offsets(start_offset)?;
+
+    Ok(IggyMessagesBatchSet::from(batch))
+}
+
+/// Load messages from disk by timestamp.
+async fn load_messages_from_disk_by_timestamp(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    timestamp: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    // Find segment range that might contain messages >= timestamp
+    let segment_range = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let segments = partition_data.log.segments();
+        if segments.is_empty() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let start = segments
+            .iter()
+            .position(|segment| segment.end_timestamp >= timestamp)
+            .unwrap_or(segments.len());
+
+        if start >= segments.len() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        start..segments.len()
+    };
+
+    let mut remaining_count = count;
+    let mut batches = IggyMessagesBatchSet::empty();
+
+    for idx in segment_range {
+        if remaining_count == 0 {
+            break;
+        }
+
+        let segment_end_timestamp = {
+            let store = partition_store.borrow();
+            let partition_data = store
+                .get(namespace)
+                .expect("partition_store: partition must exist");
+            partition_data.log.segments()[idx].end_timestamp
+        };
+
+        if segment_end_timestamp < timestamp {
+            continue;
+        }
+
+        let messages = load_segment_messages_by_timestamp(
+            partition_store,
+            namespace,
+            idx,
+            timestamp,
+            remaining_count,
+        )
+        .await?;
+
+        let messages_count = messages.count();
+        if messages_count == 0 {
+            continue;
+        }
+
+        remaining_count = remaining_count.saturating_sub(messages_count);
+        batches.add_batch_set(messages);
+    }
+
+    Ok(batches)
+}
+
+/// Load messages from a specific segment by timestamp.
+async fn load_segment_messages_by_timestamp(
+    partition_store: &RefCell<ShardLocalPartitions>,
+    namespace: &IggyNamespace,
+    idx: usize,
+    timestamp: u64,
+    count: u32,
+) -> Result<IggyMessagesBatchSet, IggyError> {
+    if count == 0 {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    // Check journal first
+    let journal_data = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let journal = partition_data.log.journal();
+        let is_journal_empty = journal.is_empty();
+        let journal_inner = journal.inner();
+        let journal_first_timestamp = journal_inner.first_timestamp;
+        let journal_last_timestamp = journal_inner.end_timestamp;
+
+        if !is_journal_empty
+            && timestamp >= journal_first_timestamp
+            && timestamp <= journal_last_timestamp
+        {
+            Some(journal.get(|batches| batches.get_by_timestamp(timestamp, 
count)))
+        } else {
+            None
+        }
+    };
+
+    if let Some(batches) = journal_data {
+        return Ok(batches);
+    }
+
+    // Load from disk
+    let (index_reader, messages_reader, indexes) = {
+        let store = partition_store.borrow();
+        let partition_data = store
+            .get(namespace)
+            .expect("partition_store: partition must exist");
+
+        let storages = partition_data.log.storages();
+        if idx >= storages.len() {
+            return Ok(IggyMessagesBatchSet::empty());
+        }
+
+        let index_reader = storages[idx]
+            .index_reader
+            .as_ref()
+            .expect("Index reader not initialized")
+            .clone();
+        let messages_reader = storages[idx]
+            .messages_reader
+            .as_ref()
+            .expect("Messages reader not initialized")
+            .clone();
+        let indexes_vec = partition_data.log.indexes();
+        let indexes = indexes_vec
+            .get(idx)
+            .and_then(|opt| opt.as_ref())
+            .map(|indexes| {
+                indexes
+                    .slice_by_timestamp(timestamp, count)
+                    .unwrap_or_default()
+            });
+        (index_reader, messages_reader, indexes)
+    };
+
+    let indexes_to_read = if let Some(indexes) = indexes {
+        if !indexes.is_empty() {
+            Some(indexes)
+        } else {
+            index_reader
+                .as_ref()
+                .load_from_disk_by_timestamp(timestamp, count)
+                .await?
+        }
+    } else {
+        index_reader
+            .as_ref()
+            .load_from_disk_by_timestamp(timestamp, count)
+            .await?
+    };
+
+    if indexes_to_read.is_none() {
+        return Ok(IggyMessagesBatchSet::empty());
+    }
+
+    let indexes_to_read = indexes_to_read.unwrap();
+    let batch = messages_reader
+        .as_ref()
+        .load_messages_from_disk(indexes_to_read)
+        .await?;
+
+    Ok(IggyMessagesBatchSet::from(batch))
+}

Reply via email to