This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch skipping-offset
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit aa017e89adf9a790bd36b74be690cf25137c191c
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Mar 17 09:24:19 2026 +0100

    fix(server): prevent consumer offset skip during concurrent produce+consume
    
    During journal.commit() -> async persist, new appends make the
    journal non-empty while the in-flight buffer holds old committed
    data. The polling router only checked in-flight when the journal
    was empty (Case 0), so committed-but-unpersisted messages were
    invisible to consumers in Cases 1-3.
    
    With auto-commit, Case 3 returned only journal messages (higher
    offsets), skipping the in-flight range permanently - the "jump
    to end" symptom reported by users with ~50M records.
    
    Replace the case-based routing with a three-phase priority
    lookup (disk -> in-flight -> journal) that checks all tiers
    regardless of journal state. Also fix the (count-1).max(1)
    off-by-one that inflated end_offset when count=1, and add
    in-flight awareness to load_segment_messages.
    
    Fixes #2715
---
 core/server/src/streaming/partitions/mod.rs       |   2 +
 core/server/src/streaming/partitions/ops.rs       | 146 +++++++-----
 core/server/src/streaming/partitions/ops_tests.rs | 276 ++++++++++++++++++++++
 3 files changed, 364 insertions(+), 60 deletions(-)

diff --git a/core/server/src/streaming/partitions/mod.rs 
b/core/server/src/streaming/partitions/mod.rs
index f8a3593ed..ee15942f9 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -26,6 +26,8 @@ pub mod local_partition;
 pub mod local_partitions;
 pub mod log;
 pub mod ops;
+#[cfg(test)]
+mod ops_tests;
 pub mod segments;
 pub mod storage;
 
diff --git a/core/server/src/streaming/partitions/ops.rs 
b/core/server/src/streaming/partitions/ops.rs
index aa481b7b6..057455916 100644
--- a/core/server/src/streaming/partitions/ops.rs
+++ b/core/server/src/streaming/partitions/ops.rs
@@ -158,73 +158,78 @@ pub async fn get_messages_by_offset(
         )
     };
 
-    let end_offset = start_offset + (count - 1).max(1) as u64;
+    let end_offset = start_offset + (count - 1) as u64;
+
+    // Three-tier priority lookup: disk -> in-flight -> journal.
+    // Tiers are contiguous and non-overlapping by construction:
+    //   disk:      [0 .. in_flight_first - 1]
+    //   in-flight: [in_flight_first .. in_flight_last]  (committed, 
persisting)
+    //   journal:   [journal_first .. journal_last]       (post-commit appends)
+    //
+    // The previous code only checked in-flight when journal was empty (Case 
0),
+    // missing the state where both in-flight and journal hold valid data for
+    // different offset ranges ("State C" - see issue #2715).
+
+    let mut combined = IggyMessagesBatchSet::empty();
+    let mut remaining = count;
+    let mut current = start_offset;
+
+    // Phase 1: Disk - offsets below both in-flight and journal.
+    let disk_upper_bound = if !in_flight_empty {
+        in_flight_first
+    } else if !is_journal_empty {
+        journal_first_offset
+    } else {
+        u64::MAX
+    };
 
-    // Case 0: Journal is empty - check in_flight buffer or disk
-    if is_journal_empty {
-        if !in_flight_empty && start_offset >= in_flight_first && start_offset 
<= in_flight_last {
-            let in_flight_batches = {
-                let store = local_partitions.borrow();
-                let partition = store
-                    .get(namespace)
-                    .expect("local_partitions: partition must exist for poll");
-                partition
-                    .log
-                    .in_flight()
-                    .get_by_offset(start_offset, count)
-                    .to_vec()
-            };
-            if !in_flight_batches.is_empty() {
-                let mut result = IggyMessagesBatchSet::empty();
-                result.add_immutable_batches(&in_flight_batches);
-                return Ok(result.get_by_offset(start_offset, count));
-            }
+    if current < disk_upper_bound && remaining > 0 {
+        let disk_end = end_offset.min(disk_upper_bound.saturating_sub(1));
+        let disk_count = ((disk_end - current + 1) as u32).min(remaining);
+        let disk_messages =
+            load_messages_from_disk(local_partitions, namespace, current, 
disk_count).await?;
+        let loaded = disk_messages.count();
+        if loaded > 0 {
+            current += loaded as u64;
+            remaining = remaining.saturating_sub(loaded);
+            combined.add_batch_set(disk_messages);
         }
-        return load_messages_from_disk(local_partitions, namespace, 
start_offset, count).await;
     }
 
-    // Case 1: All messages are in journal
-    if start_offset >= journal_first_offset && end_offset <= 
journal_last_offset {
-        let batches = {
+    // Phase 2: In-flight - committed data being persisted to disk.
+    if remaining > 0 && !in_flight_empty && current >= in_flight_first && 
current <= in_flight_last
+    {
+        let in_flight_count = ((in_flight_last - current + 1) as 
u32).min(remaining);
+        let in_flight_batches = {
             let store = local_partitions.borrow();
             let partition = store
                 .get(namespace)
                 .expect("local_partitions: partition must exist for poll");
             partition
                 .log
-                .journal()
-                .get(|batches| batches.get_by_offset(start_offset, count))
+                .in_flight()
+                .get_by_offset(current, in_flight_count)
+                .to_vec()
         };
-        return Ok(batches);
-    }
-
-    // Case 2: All messages on disk (end_offset < journal_first_offset)
-    if end_offset < journal_first_offset {
-        return load_messages_from_disk(local_partitions, namespace, 
start_offset, count).await;
-    }
-
-    // Case 3: Messages span disk and journal boundary
-    let disk_count = if start_offset < journal_first_offset {
-        ((journal_first_offset - start_offset) as u32).min(count)
-    } else {
-        0
-    };
-
-    let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
-    // Load messages from disk if needed
-    if disk_count > 0 {
-        let disk_messages =
-            load_messages_from_disk(local_partitions, namespace, start_offset, 
disk_count).await?;
-        if !disk_messages.is_empty() {
-            combined_batch_set.add_batch_set(disk_messages);
+        if !in_flight_batches.is_empty() {
+            let mut result = IggyMessagesBatchSet::empty();
+            result.add_immutable_batches(&in_flight_batches);
+            let sliced = result.get_by_offset(current, in_flight_count);
+            let loaded = sliced.count();
+            if loaded > 0 {
+                current += loaded as u64;
+                remaining = remaining.saturating_sub(loaded);
+                combined.add_batch_set(sliced);
+            }
         }
     }
 
-    // Get remaining messages from journal
-    let remaining_count = count - combined_batch_set.count();
-    if remaining_count > 0 {
-        let journal_start_offset = std::cmp::max(start_offset, 
journal_first_offset);
+    // Phase 3: Journal - newest appends (post-commit).
+    if remaining > 0
+        && !is_journal_empty
+        && current >= journal_first_offset
+        && current <= journal_last_offset
+    {
         let journal_messages = {
             let store = local_partitions.borrow();
             let partition = store
@@ -233,14 +238,20 @@ pub async fn get_messages_by_offset(
             partition
                 .log
                 .journal()
-                .get(|batches| batches.get_by_offset(journal_start_offset, 
remaining_count))
+                .get(|batches| batches.get_by_offset(current, remaining))
         };
         if !journal_messages.is_empty() {
-            combined_batch_set.add_batch_set(journal_messages);
+            combined.add_batch_set(journal_messages);
         }
     }
 
-    Ok(combined_batch_set)
+    // Fallback: if no in-memory tier matched at all, try disk for the full 
range.
+    // Handles the common case where journal and in-flight are both empty.
+    if combined.is_empty() && remaining == count {
+        return load_messages_from_disk(local_partitions, namespace, 
start_offset, count).await;
+    }
+
+    Ok(combined)
 }
 
 /// Poll messages by timestamp.
@@ -388,7 +399,7 @@ pub async fn load_messages_from_disk(
             current_offset
         };
 
-        let mut end_offset = offset + (remaining_count - 1).max(1) as u64;
+        let mut end_offset = offset + (remaining_count - 1) as u64;
         if end_offset > segment_end_offset {
             end_offset = segment_end_offset;
         }
@@ -429,13 +440,28 @@ async fn load_segment_messages(
 ) -> Result<IggyMessagesBatchSet, IggyError> {
     let relative_start_offset = (start_offset - segment_start_offset) as u32;
 
-    // Check journal first for this segment's data
-    let journal_data = {
+    // Check in-memory tiers (in-flight, journal) before going to disk.
+    let in_memory_data = {
         let store = local_partitions.borrow();
         let partition = store
             .get(namespace)
             .expect("local_partitions: partition must exist");
 
+        // Check in-flight buffer first (committed data being persisted).
+        let in_flight = partition.log.in_flight();
+        if !in_flight.is_empty()
+            && start_offset >= in_flight.first_offset()
+            && start_offset <= in_flight.last_offset()
+        {
+            let batches = in_flight.get_by_offset(start_offset, 
count).to_vec();
+            if !batches.is_empty() {
+                let mut result = IggyMessagesBatchSet::empty();
+                result.add_immutable_batches(&batches);
+                return Ok(result.get_by_offset(start_offset, count));
+            }
+        }
+
+        // Check journal (post-commit appends).
         let journal = partition.log.journal();
         let is_journal_empty = journal.is_empty();
         let journal_inner = journal.inner();
@@ -452,7 +478,7 @@ async fn load_segment_messages(
         }
     };
 
-    if let Some(batches) = journal_data {
+    if let Some(batches) = in_memory_data {
         return Ok(batches);
     }
 
diff --git a/core/server/src/streaming/partitions/ops_tests.rs 
b/core/server/src/streaming/partitions/ops_tests.rs
new file mode 100644
index 000000000..99e952d1f
--- /dev/null
+++ b/core/server/src/streaming/partitions/ops_tests.rs
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the in-flight buffer visibility gap fix (issue #2715).
+//!
+//! These tests set up "State C" directly in memory:
+//!   - in-flight holds offsets [0..N-1]  (committed journal, not yet on disk)
+//!   - journal holds offsets  [N..N+M-1] (new appends after commit)
+//!   - no actual disk data
+//!
+//! Before the fix, Cases 1-3 in get_messages_by_offset never checked
+//! in-flight, causing the consumer to miss committed data and either
+//! get empty results or skip directly to journal offsets.
+
+#[cfg(test)]
+mod tests {
+    use 
crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets;
+    use crate::streaming::partitions::consumer_offsets::ConsumerOffsets;
+    use crate::streaming::partitions::journal::{Inner, Journal};
+    use crate::streaming::partitions::local_partition::LocalPartition;
+    use crate::streaming::partitions::local_partitions::LocalPartitions;
+    use crate::streaming::partitions::ops;
+    use crate::streaming::polling_consumer::PollingConsumer;
+    use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+    use iggy_common::sharding::IggyNamespace;
+    use iggy_common::{
+        IggyByteSize, IggyMessage, MemoryPool, MemoryPoolConfigOther, 
PollingStrategy, Sizeable,
+    };
+    use std::cell::RefCell;
+    use std::sync::Arc;
+    use std::sync::atomic::AtomicU64;
+
+    fn init_memory_pool() {
+        static INIT: std::sync::Once = std::sync::Once::new();
+        INIT.call_once(|| {
+            let config = MemoryPoolConfigOther {
+                enabled: false,
+                size: IggyByteSize::from(64 * 1024 * 1024u64),
+                bucket_capacity: 256,
+            };
+            MemoryPool::init_pool(&config);
+        });
+    }
+
+    fn create_test_partition(current_offset: u64) -> LocalPartition {
+        let stream_stats = Arc::new(StreamStats::default());
+        let topic_stats = Arc::new(TopicStats::new(stream_stats));
+        let partition_stats = Arc::new(PartitionStats::new(topic_stats));
+
+        LocalPartition::new(
+            partition_stats,
+            Arc::new(AtomicU64::new(current_offset)),
+            Arc::new(ConsumerOffsets::with_capacity(10)),
+            Arc::new(ConsumerGroupOffsets::with_capacity(10)),
+            None,
+            iggy_common::IggyTimestamp::now(),
+            1,
+            true,
+        )
+    }
+
+    fn create_batch(count: u32) -> iggy_common::IggyMessagesBatchMut {
+        let messages: Vec<IggyMessage> = (0..count)
+            .map(|_| {
+                IggyMessage::builder()
+                    .payload(bytes::Bytes::from("test-payload"))
+                    .build()
+                    .unwrap()
+            })
+            .collect();
+
+        let messages_size: u32 = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_u32())
+            .sum();
+        iggy_common::IggyMessagesBatchMut::from_messages(&messages, 
messages_size)
+    }
+
+    /// Sets up "State C": in-flight holds committed data, journal holds new
+    /// appends that arrived after commit but before persist completes.
+    ///
+    /// Layout:
+    ///   segment metadata:  [0..journal_end]   (no actual disk data)
+    ///   in-flight:         [0..in_flight_count-1]
+    ///   journal:           [in_flight_count..in_flight_count+journal_count-1]
+    ///   partition.offset:  in_flight_count + journal_count - 1
+    async fn setup_state_c(
+        in_flight_count: u32,
+        journal_count: u32,
+    ) -> (RefCell<LocalPartitions>, IggyNamespace) {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+
+        let in_flight_end = in_flight_count as u64 - 1;
+        let journal_base = in_flight_end + 1;
+        let journal_end = journal_base + journal_count as u64 - 1;
+
+        let mut partition = create_test_partition(journal_end);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        let storage = iggy_common::SegmentStorage::default();
+        partition.log.add_persisted_segment(segment, storage);
+
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = journal_end;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        let mut in_flight_batch = create_batch(in_flight_count);
+        in_flight_batch.prepare_for_persistence(0, 0, 0, None).await;
+        let in_flight_size = in_flight_batch.size();
+        partition.log.set_in_flight(vec![in_flight_batch.freeze()]);
+
+        let journal_inner = Inner {
+            base_offset: journal_base,
+            current_offset: 0,
+            first_timestamp: 0,
+            end_timestamp: 0,
+            messages_count: 0,
+            size: IggyByteSize::default(),
+        };
+        partition.log.journal_mut().init(journal_inner);
+
+        let mut journal_batch = create_batch(journal_count);
+        journal_batch
+            .prepare_for_persistence(0, journal_base, in_flight_size as u32, 
None)
+            .await;
+        partition.log.journal_mut().append(journal_batch).unwrap();
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        (RefCell::new(store), ns)
+    }
+
+    // -----------------------------------------------------------------------
+    // Issue #2715: In-flight buffer must be reachable when journal is 
non-empty
+    // -----------------------------------------------------------------------
+
+    #[compio::test]
+    async fn in_flight_reachable_when_journal_non_empty() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 5)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 5);
+        assert_eq!(batches.first_offset(), Some(0));
+    }
+
+    #[compio::test]
+    async fn spanning_in_flight_and_journal_returns_all_in_order() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 15)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 15);
+        assert_eq!(batches.first_offset(), Some(0));
+    }
+
+    #[compio::test]
+    async fn polling_next_starts_from_in_flight_not_journal() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let consumer = PollingConsumer::Consumer(1, 0);
+        let args =
+            
crate::shard::system::messages::PollingArgs::new(PollingStrategy::next(), 15, 
false);
+        let (metadata, batches) = ops::poll_messages(&store, &ns, consumer, 
args)
+            .await
+            .unwrap();
+        assert_eq!(batches.first_offset(), Some(0));
+        assert!(metadata.current_offset >= 14);
+    }
+
+    #[compio::test]
+    async fn single_message_at_in_flight_journal_boundary() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 9, 1)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 1);
+        assert_eq!(batches.first_offset(), Some(9));
+    }
+
+    // -----------------------------------------------------------------------
+    // Existing correct behavior must still work
+    // -----------------------------------------------------------------------
+
+    #[compio::test]
+    async fn in_flight_reachable_when_journal_empty() {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+        let mut partition = create_test_partition(9);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        partition
+            .log
+            .add_persisted_segment(segment, 
iggy_common::SegmentStorage::default());
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = 9;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        let mut batch = create_batch(10);
+        batch.prepare_for_persistence(0, 0, 0, None).await;
+        partition.log.set_in_flight(vec![batch.freeze()]);
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        let store = RefCell::new(store);
+
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 10);
+    }
+
+    #[compio::test]
+    async fn journal_reachable_when_in_flight_empty() {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+        let mut partition = create_test_partition(9);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        partition
+            .log
+            .add_persisted_segment(segment, 
iggy_common::SegmentStorage::default());
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = 9;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        partition.log.journal_mut().init(Inner {
+            base_offset: 0,
+            current_offset: 0,
+            first_timestamp: 0,
+            end_timestamp: 0,
+            messages_count: 0,
+            size: IggyByteSize::default(),
+        });
+
+        let mut batch = create_batch(10);
+        batch.prepare_for_persistence(0, 0, 0, None).await;
+        partition.log.journal_mut().append(batch).unwrap();
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        let store = RefCell::new(store);
+
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 10);
+    }
+
+    #[compio::test]
+    async fn journal_single_message_at_specific_offset() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 12, 1)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 1);
+        assert_eq!(batches.first_offset(), Some(12));
+    }
+}

Reply via email to