This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch skipping-offset in repository https://gitbox.apache.org/repos/asf/iggy.git
commit aa017e89adf9a790bd36b74be690cf25137c191c Author: Hubert Gruszecki <[email protected]> AuthorDate: Tue Mar 17 09:24:19 2026 +0100 fix(server): prevent consumer offset skip during concurrent produce+consume During journal.commit() -> async persist, new appends make the journal non-empty while the in-flight buffer holds old committed data. The polling router only checked in-flight when the journal was empty (Case 0), so committed-but-unpersisted messages were invisible to consumers in Cases 1-3. With auto-commit, Case 3 returned only journal messages (higher offsets), skipping the in-flight range permanently - the "jump to end" symptom reported by users with ~50M records. Replace the case-based routing with a three-phase priority lookup (disk -> in-flight -> journal) that checks all tiers regardless of journal state. Also fix the (count-1).max(1) off-by-one that inflated end_offset when count=1, and add in-flight awareness to load_segment_messages. Fixes #2715 --- core/server/src/streaming/partitions/mod.rs | 2 + core/server/src/streaming/partitions/ops.rs | 146 +++++++----- core/server/src/streaming/partitions/ops_tests.rs | 276 ++++++++++++++++++++++ 3 files changed, 364 insertions(+), 60 deletions(-) diff --git a/core/server/src/streaming/partitions/mod.rs b/core/server/src/streaming/partitions/mod.rs index f8a3593ed..ee15942f9 100644 --- a/core/server/src/streaming/partitions/mod.rs +++ b/core/server/src/streaming/partitions/mod.rs @@ -26,6 +26,8 @@ pub mod local_partition; pub mod local_partitions; pub mod log; pub mod ops; +#[cfg(test)] +mod ops_tests; pub mod segments; pub mod storage; diff --git a/core/server/src/streaming/partitions/ops.rs b/core/server/src/streaming/partitions/ops.rs index aa481b7b6..057455916 100644 --- a/core/server/src/streaming/partitions/ops.rs +++ b/core/server/src/streaming/partitions/ops.rs @@ -158,73 +158,78 @@ pub async fn get_messages_by_offset( ) }; - let end_offset = start_offset + (count - 1).max(1) as u64; + let end_offset = start_offset + (count - 1) as u64; + + // Three-tier priority lookup: disk -> in-flight -> journal. + // Tiers are contiguous and non-overlapping by construction: + // disk: [0 .. in_flight_first - 1] + // in-flight: [in_flight_first .. in_flight_last] (committed, persisting) + // journal: [journal_first .. journal_last] (post-commit appends) + // + // The previous code only checked in-flight when journal was empty (Case 0), + // missing the state where both in-flight and journal hold valid data for + // different offset ranges ("State C" - see issue #2715). + + let mut combined = IggyMessagesBatchSet::empty(); + let mut remaining = count; + let mut current = start_offset; + + // Phase 1: Disk - offsets below both in-flight and journal. + let disk_upper_bound = if !in_flight_empty { + in_flight_first + } else if !is_journal_empty { + journal_first_offset + } else { + u64::MAX + }; - // Case 0: Journal is empty - check in_flight buffer or disk - if is_journal_empty { - if !in_flight_empty && start_offset >= in_flight_first && start_offset <= in_flight_last { - let in_flight_batches = { - let store = local_partitions.borrow(); - let partition = store - .get(namespace) - .expect("local_partitions: partition must exist for poll"); - partition - .log - .in_flight() - .get_by_offset(start_offset, count) - .to_vec() - }; - if !in_flight_batches.is_empty() { - let mut result = IggyMessagesBatchSet::empty(); - result.add_immutable_batches(&in_flight_batches); - return Ok(result.get_by_offset(start_offset, count)); - } + if current < disk_upper_bound && remaining > 0 { + let disk_end = end_offset.min(disk_upper_bound.saturating_sub(1)); + let disk_count = ((disk_end - current + 1) as u32).min(remaining); + let disk_messages = + load_messages_from_disk(local_partitions, namespace, current, disk_count).await?; + let loaded = disk_messages.count(); + if loaded > 0 { + current += loaded as u64; + remaining = remaining.saturating_sub(loaded); + combined.add_batch_set(disk_messages); } - return load_messages_from_disk(local_partitions, namespace, start_offset, count).await; } - // Case 1: All messages are in journal - if start_offset >= journal_first_offset && end_offset <= journal_last_offset { - let batches = { + // Phase 2: In-flight - committed data being persisted to disk. + if remaining > 0 && !in_flight_empty && current >= in_flight_first && current <= in_flight_last + { + let in_flight_count = ((in_flight_last - current + 1) as u32).min(remaining); + let in_flight_batches = { let store = local_partitions.borrow(); let partition = store .get(namespace) .expect("local_partitions: partition must exist for poll"); partition .log - .journal() - .get(|batches| batches.get_by_offset(start_offset, count)) + .in_flight() + .get_by_offset(current, in_flight_count) + .to_vec() }; - return Ok(batches); - } - - // Case 2: All messages on disk (end_offset < journal_first_offset) - if end_offset < journal_first_offset { - return load_messages_from_disk(local_partitions, namespace, start_offset, count).await; - } - - // Case 3: Messages span disk and journal boundary - let disk_count = if start_offset < journal_first_offset { - ((journal_first_offset - start_offset) as u32).min(count) - } else { - 0 - }; - - let mut combined_batch_set = IggyMessagesBatchSet::empty(); - - // Load messages from disk if needed - if disk_count > 0 { - let disk_messages = - load_messages_from_disk(local_partitions, namespace, start_offset, disk_count).await?; - if !disk_messages.is_empty() { - combined_batch_set.add_batch_set(disk_messages); + if !in_flight_batches.is_empty() { + let mut result = IggyMessagesBatchSet::empty(); + result.add_immutable_batches(&in_flight_batches); + let sliced = result.get_by_offset(current, in_flight_count); + let loaded = sliced.count(); + if loaded > 0 { + current += loaded as u64; + remaining = remaining.saturating_sub(loaded); + combined.add_batch_set(sliced); + } } } - // Get remaining messages from journal - let remaining_count = count - combined_batch_set.count(); - if remaining_count > 0 { - let journal_start_offset = std::cmp::max(start_offset, journal_first_offset); + // Phase 3: Journal - newest appends (post-commit). + if remaining > 0 + && !is_journal_empty + && current >= journal_first_offset + && current <= journal_last_offset + { let journal_messages = { let store = local_partitions.borrow(); let partition = store @@ -233,14 +238,20 @@ pub async fn get_messages_by_offset( partition .log .journal() - .get(|batches| batches.get_by_offset(journal_start_offset, remaining_count)) + .get(|batches| batches.get_by_offset(current, remaining)) }; if !journal_messages.is_empty() { - combined_batch_set.add_batch_set(journal_messages); + combined.add_batch_set(journal_messages); } } - Ok(combined_batch_set) + // Fallback: if no in-memory tier matched at all, try disk for the full range. + // Handles the common case where journal and in-flight are both empty. + if combined.is_empty() && remaining == count { + return load_messages_from_disk(local_partitions, namespace, start_offset, count).await; + } + + Ok(combined) } /// Poll messages by timestamp. @@ -388,7 +399,7 @@ pub async fn load_messages_from_disk( current_offset }; - let mut end_offset = offset + (remaining_count - 1).max(1) as u64; + let mut end_offset = offset + (remaining_count - 1) as u64; if end_offset > segment_end_offset { end_offset = segment_end_offset; } @@ -429,13 +440,28 @@ async fn load_segment_messages( ) -> Result<IggyMessagesBatchSet, IggyError> { let relative_start_offset = (start_offset - segment_start_offset) as u32; - // Check journal first for this segment's data - let journal_data = { + // Check in-memory tiers (in-flight, journal) before going to disk. + let in_memory_data = { let store = local_partitions.borrow(); let partition = store .get(namespace) .expect("local_partitions: partition must exist"); + // Check in-flight buffer first (committed data being persisted). + let in_flight = partition.log.in_flight(); + if !in_flight.is_empty() + && start_offset >= in_flight.first_offset() + && start_offset <= in_flight.last_offset() + { + let batches = in_flight.get_by_offset(start_offset, count).to_vec(); + if !batches.is_empty() { + let mut result = IggyMessagesBatchSet::empty(); + result.add_immutable_batches(&batches); + return Ok(result.get_by_offset(start_offset, count)); + } + } + + // Check journal (post-commit appends). let journal = partition.log.journal(); let is_journal_empty = journal.is_empty(); let journal_inner = journal.inner(); @@ -452,7 +478,7 @@ async fn load_segment_messages( } }; - if let Some(batches) = journal_data { + if let Some(batches) = in_memory_data { return Ok(batches); } diff --git a/core/server/src/streaming/partitions/ops_tests.rs b/core/server/src/streaming/partitions/ops_tests.rs new file mode 100644 index 000000000..99e952d1f --- /dev/null +++ b/core/server/src/streaming/partitions/ops_tests.rs @@ -0,0 +1,276 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the in-flight buffer visibility gap fix (issue #2715). +//! +//! These tests set up "State C" directly in memory: +//! - in-flight holds offsets [0..N-1] (committed journal, not yet on disk) +//! - journal holds offsets [N..N+M-1] (new appends after commit) +//! - no actual disk data +//! +//! Before the fix, Cases 1-3 in get_messages_by_offset never checked +//! in-flight, causing the consumer to miss committed data and either +//! get empty results or skip directly to journal offsets. + +#[cfg(test)] +mod tests { + use crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets; + use crate::streaming::partitions::consumer_offsets::ConsumerOffsets; + use crate::streaming::partitions::journal::{Inner, Journal}; + use crate::streaming::partitions::local_partition::LocalPartition; + use crate::streaming::partitions::local_partitions::LocalPartitions; + use crate::streaming::partitions::ops; + use crate::streaming::polling_consumer::PollingConsumer; + use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats}; + use iggy_common::sharding::IggyNamespace; + use iggy_common::{ + IggyByteSize, IggyMessage, MemoryPool, MemoryPoolConfigOther, PollingStrategy, Sizeable, + }; + use std::cell::RefCell; + use std::sync::Arc; + use std::sync::atomic::AtomicU64; + + fn init_memory_pool() { + static INIT: std::sync::Once = std::sync::Once::new(); + INIT.call_once(|| { + let config = MemoryPoolConfigOther { + enabled: false, + size: IggyByteSize::from(64 * 1024 * 1024u64), + bucket_capacity: 256, + }; + MemoryPool::init_pool(&config); + }); + } + + fn create_test_partition(current_offset: u64) -> LocalPartition { + let stream_stats = Arc::new(StreamStats::default()); + let topic_stats = Arc::new(TopicStats::new(stream_stats)); + let partition_stats = Arc::new(PartitionStats::new(topic_stats)); + + LocalPartition::new( + partition_stats, + Arc::new(AtomicU64::new(current_offset)), + Arc::new(ConsumerOffsets::with_capacity(10)), + Arc::new(ConsumerGroupOffsets::with_capacity(10)), + None, + iggy_common::IggyTimestamp::now(), + 1, + true, + ) + } + + fn create_batch(count: u32) -> iggy_common::IggyMessagesBatchMut { + let messages: Vec<IggyMessage> = (0..count) + .map(|_| { + IggyMessage::builder() + .payload(bytes::Bytes::from("test-payload")) + .build() + .unwrap() + }) + .collect(); + + let messages_size: u32 = messages + .iter() + .map(|m| m.get_size_bytes().as_bytes_u32()) + .sum(); + iggy_common::IggyMessagesBatchMut::from_messages(&messages, messages_size) + } + + /// Sets up "State C": in-flight holds committed data, journal holds new + /// appends that arrived after commit but before persist completes. + /// + /// Layout: + /// segment metadata: [0..journal_end] (no actual disk data) + /// in-flight: [0..in_flight_count-1] + /// journal: [in_flight_count..in_flight_count+journal_count-1] + /// partition.offset: in_flight_count + journal_count - 1 + async fn setup_state_c( + in_flight_count: u32, + journal_count: u32, + ) -> (RefCell<LocalPartitions>, IggyNamespace) { + init_memory_pool(); + let ns = IggyNamespace::new(1, 1, 0); + + let in_flight_end = in_flight_count as u64 - 1; + let journal_base = in_flight_end + 1; + let journal_end = journal_base + journal_count as u64 - 1; + + let mut partition = create_test_partition(journal_end); + + let segment = iggy_common::Segment::new(0, IggyByteSize::from(1_073_741_824u64)); + let storage = iggy_common::SegmentStorage::default(); + partition.log.add_persisted_segment(segment, storage); + + let seg = &mut partition.log.segments_mut()[0]; + seg.end_offset = journal_end; + seg.start_timestamp = 1; + seg.end_timestamp = 2; + + let mut in_flight_batch = create_batch(in_flight_count); + in_flight_batch.prepare_for_persistence(0, 0, 0, None).await; + let in_flight_size = in_flight_batch.size(); + partition.log.set_in_flight(vec![in_flight_batch.freeze()]); + + let journal_inner = Inner { + base_offset: journal_base, + current_offset: 0, + first_timestamp: 0, + end_timestamp: 0, + messages_count: 0, + size: IggyByteSize::default(), + }; + partition.log.journal_mut().init(journal_inner); + + let mut journal_batch = create_batch(journal_count); + journal_batch + .prepare_for_persistence(0, journal_base, in_flight_size as u32, None) + .await; + partition.log.journal_mut().append(journal_batch).unwrap(); + + let mut store = LocalPartitions::new(); + store.insert(ns, partition); + (RefCell::new(store), ns) + } + + // ----------------------------------------------------------------------- + // Issue #2715: In-flight buffer must be reachable when journal is non-empty + // ----------------------------------------------------------------------- + + #[compio::test] + async fn in_flight_reachable_when_journal_non_empty() { + let (store, ns) = setup_state_c(10, 5).await; + let batches = ops::get_messages_by_offset(&store, &ns, 0, 5) + .await + .unwrap(); + assert_eq!(batches.count(), 5); + assert_eq!(batches.first_offset(), Some(0)); + } + + #[compio::test] + async fn spanning_in_flight_and_journal_returns_all_in_order() { + let (store, ns) = setup_state_c(10, 5).await; + let batches = ops::get_messages_by_offset(&store, &ns, 0, 15) + .await + .unwrap(); + assert_eq!(batches.count(), 15); + assert_eq!(batches.first_offset(), Some(0)); + } + + #[compio::test] + async fn polling_next_starts_from_in_flight_not_journal() { + let (store, ns) = setup_state_c(10, 5).await; + let consumer = PollingConsumer::Consumer(1, 0); + let args = + crate::shard::system::messages::PollingArgs::new(PollingStrategy::next(), 15, false); + let (metadata, batches) = ops::poll_messages(&store, &ns, consumer, args) + .await + .unwrap(); + assert_eq!(batches.first_offset(), Some(0)); + assert!(metadata.current_offset >= 14); + } + + #[compio::test] + async fn single_message_at_in_flight_journal_boundary() { + let (store, ns) = setup_state_c(10, 5).await; + let batches = ops::get_messages_by_offset(&store, &ns, 9, 1) + .await + .unwrap(); + assert_eq!(batches.count(), 1); + assert_eq!(batches.first_offset(), Some(9)); + } + + // ----------------------------------------------------------------------- + // Existing correct behavior must still work + // ----------------------------------------------------------------------- + + #[compio::test] + async fn in_flight_reachable_when_journal_empty() { + init_memory_pool(); + let ns = IggyNamespace::new(1, 1, 0); + let mut partition = create_test_partition(9); + + let segment = iggy_common::Segment::new(0, IggyByteSize::from(1_073_741_824u64)); + partition + .log + .add_persisted_segment(segment, iggy_common::SegmentStorage::default()); + let seg = &mut partition.log.segments_mut()[0]; + seg.end_offset = 9; + seg.start_timestamp = 1; + seg.end_timestamp = 2; + + let mut batch = create_batch(10); + batch.prepare_for_persistence(0, 0, 0, None).await; + partition.log.set_in_flight(vec![batch.freeze()]); + + let mut store = LocalPartitions::new(); + store.insert(ns, partition); + let store = RefCell::new(store); + + let batches = ops::get_messages_by_offset(&store, &ns, 0, 10) + .await + .unwrap(); + assert_eq!(batches.count(), 10); + } + + #[compio::test] + async fn journal_reachable_when_in_flight_empty() { + init_memory_pool(); + let ns = IggyNamespace::new(1, 1, 0); + let mut partition = create_test_partition(9); + + let segment = iggy_common::Segment::new(0, IggyByteSize::from(1_073_741_824u64)); + partition + .log + .add_persisted_segment(segment, iggy_common::SegmentStorage::default()); + let seg = &mut partition.log.segments_mut()[0]; + seg.end_offset = 9; + seg.start_timestamp = 1; + seg.end_timestamp = 2; + + partition.log.journal_mut().init(Inner { + base_offset: 0, + current_offset: 0, + first_timestamp: 0, + end_timestamp: 0, + messages_count: 0, + size: IggyByteSize::default(), + }); + + let mut batch = create_batch(10); + batch.prepare_for_persistence(0, 0, 0, None).await; + partition.log.journal_mut().append(batch).unwrap(); + + let mut store = LocalPartitions::new(); + store.insert(ns, partition); + let store = RefCell::new(store); + + let batches = ops::get_messages_by_offset(&store, &ns, 0, 10) + .await + .unwrap(); + assert_eq!(batches.count(), 10); + } + + #[compio::test] + async fn journal_single_message_at_specific_offset() { + let (store, ns) = setup_state_c(10, 5).await; + let batches = ops::get_messages_by_offset(&store, &ns, 12, 1) + .await + .unwrap(); + assert_eq!(batches.count(), 1); + assert_eq!(batches.first_offset(), Some(12)); + } +}
