This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix-persistence-read in repository https://gitbox.apache.org/repos/asf/iggy.git
commit c9a2d3ab81bd51127ce3cf794b4506acb08010c9 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 16 18:09:52 2026 +0100 fix(server): prevent message unavailability during background persistence Messages were unavailable when background message_saver committed the journal (emptying it) and started async disk I/O before completion. Polling during this window found neither journal nor disk data. The fix freezes journal batches and sets them in the in-flight buffer before async persist. Readers now merge in-flight data with segments, maintaining availability throughout the disk write operation. --- .../scenarios/read_during_persistence_scenario.rs | 77 ++++++++--------- core/server/src/shard/system/messages.rs | 12 ++- core/server/src/slab/streams.rs | 96 ++++++++++++++++++++++ core/server/src/streaming/partitions/helpers.rs | 23 ++++++ core/server/src/streaming/partitions/in_flight.rs | 27 +++++- 5 files changed, 192 insertions(+), 43 deletions(-) diff --git a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs index 0ff9c8bd3..5153337c2 100644 --- a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs +++ b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs @@ -28,15 +28,21 @@ use std::time::{Duration, Instant}; const STREAM_NAME: &str = "eventual-consistency-stream"; const TOPIC_NAME: &str = "eventual-consistency-topic"; -const TEST_DURATION_SECS: u64 = 10; -/// Test with two separate clients - one for sending, one for polling. +/// Test that specifically targets the background message_saver race condition. /// -/// This should expose the race condition where messages are in-flight during -/// disk write and unavailable for polling. +/// Setup: +/// - HIGH inline persistence thresholds (messages stay in journal during send) +/// - SHORT message_saver interval (background flushes happen frequently) +/// +/// The race occurs when: +/// 1. Messages are in journal +/// 2. Background saver calls commit() - journal becomes EMPTY +/// 3. Background saver starts async disk write +/// 4. Poll arrives - sees empty journal, data not yet on disk #[tokio::test] #[parallel] -async fn should_read_messages_immediately_after_send_with_aggressive_persistence() { +async fn should_read_messages_during_background_saver_flush() { let env_vars = HashMap::from([ ( SYSTEM_PATH_ENV_VAR.to_owned(), @@ -44,16 +50,18 @@ async fn should_read_messages_immediately_after_send_with_aggressive_persistence ), ( "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_owned(), - "32".to_owned(), + "100000".to_owned(), ), ( "IGGY_SYSTEM_PARTITION_SIZE_OF_MESSAGES_REQUIRED_TO_SAVE".to_owned(), - "512B".to_owned(), + "1GB".to_owned(), ), ( "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_owned(), "false".to_owned(), ), + ("IGGY_MESSAGE_SAVER_INTERVAL".to_owned(), "100ms".to_owned()), + ("IGGY_MESSAGE_SAVER_ENABLED".to_owned(), "true".to_owned()), ]); let mut test_server = TestServer::new(Some(env_vars), true, None, IpAddrKind::V4); @@ -96,21 +104,23 @@ async fn should_read_messages_immediately_after_send_with_aggressive_persistence let topic_id = Identifier::named(TOPIC_NAME).unwrap(); let consumer_kind = Consumer::default(); - let test_duration = Duration::from_secs(TEST_DURATION_SECS); - let messages_per_batch = 32u32; - let payload = "X".repeat(1024); + let test_duration = Duration::from_secs(10); + let messages_per_batch = 10u32; + let payload = "X".repeat(512); let start = Instant::now(); let mut batches_sent = 0u64; - let mut messages_sent = 0u64; + let mut next_offset = 0u64; println!( - "Starting test: 1KB messages, {} msgs/batch, duration: {}s", - messages_per_batch, TEST_DURATION_SECS + "Starting background saver race test: {} msgs/batch, duration: {}s", + messages_per_batch, + test_duration.as_secs() ); + println!("Inline persistence DISABLED (high thresholds), background saver every 100ms"); while start.elapsed() < test_duration { - let base_offset = batches_sent * messages_per_batch as u64; + let base_offset = next_offset; let mut messages: Vec<IggyMessage> = (0..messages_per_batch) .map(|i| { @@ -125,68 +135,61 @@ async fn should_read_messages_immediately_after_send_with_aggressive_persistence }) .collect(); - println!("Sending batch {}", batches_sent); - let send_result = producer + producer .send_messages( &stream_id, &topic_id, &Partitioning::partition_id(0), &mut messages, ) - .await; - match &send_result { - Ok(_) => println!("Batch {} sent successfully", batches_sent), - Err(e) => println!("Batch {} send error: {:?}", batches_sent, e), - } - send_result.unwrap(); + .await + .unwrap(); batches_sent += 1; - messages_sent += messages_per_batch as u64; - println!("Calling poll_messages after batch {}", batches_sent); let poll_result = consumer .poll_messages( &stream_id, &topic_id, Some(0), &consumer_kind, - &PollingStrategy::offset(0), - messages_sent as u32, + &PollingStrategy::offset(next_offset), + messages_per_batch, false, ) .await; let polled_count = match &poll_result { - Ok(polled) => polled.messages.len() as u64, + Ok(polled) => polled.messages.len() as u32, Err(e) => { println!("Poll error: {:?}", e); 0 } }; - if polled_count < messages_sent { - let missing = messages_sent - polled_count; + if polled_count < messages_per_batch { + let missing = messages_per_batch - polled_count; let elapsed_ms = start.elapsed().as_millis(); panic!( - "RACE CONDITION DETECTED after {:.2}s/{}s ({} batches, {} messages), expected {} messages, got {}. Missing: {}", + "RACE CONDITION DETECTED after {:.2}s ({} batches), polled from offset {}, expected {}, got {}. Missing: {}", elapsed_ms as f64 / 1000.0, - TEST_DURATION_SECS, batches_sent, - messages_sent, - messages_sent, + next_offset, + messages_per_batch, polled_count, missing ); } - if batches_sent.is_multiple_of(1000) { + next_offset += messages_per_batch as u64; + + if batches_sent.is_multiple_of(500) { println!( - "Progress: {} batches, {} messages, elapsed: {:.2}s/{}s", + "Progress: {} batches, {} messages, elapsed: {:.2}s", batches_sent, - messages_sent, + next_offset, start.elapsed().as_secs_f64(), - TEST_DURATION_SECS ); } } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 02e8d9d0e..7f4f299e7 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -315,19 +315,23 @@ impl IggyShard { partition_id: usize, fsync: bool, ) -> Result<(), IggyError> { - let batches = self.streams.with_partition_by_id_mut( + let frozen_batches = self.streams.with_partition_by_id_mut( stream_id, topic_id, partition_id, - partitions::helpers::commit_journal(), + partitions::helpers::commit_journal_with_in_flight(), ); + if frozen_batches.is_empty() { + return Ok(()); + } + self.streams - .persist_messages_to_disk( + .persist_frozen_messages_to_disk( stream_id, topic_id, partition_id, - batches, + &frozen_batches, &self.config.system, ) .await?; diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index f1c342de2..37e4192c4 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1414,6 +1414,102 @@ impl Streams { Ok(batch_count) } + /// Persist frozen (immutable) batches to disk. + /// Assumes in-flight buffer is already set by caller. + /// Clears in-flight buffer after successful persist. + pub async fn persist_frozen_messages_to_disk( + &self, + stream_id: &Identifier, + topic_id: &Identifier, + partition_id: usize, + frozen_batches: &[IggyMessagesBatch], + config: &SystemConfig, + ) -> Result<u32, IggyError> { + let (batch_count, batch_size): (u32, u32) = frozen_batches + .iter() + .fold((0u32, 0u32), |(count_acc, size_acc), b| { + (count_acc + b.count(), size_acc + b.size()) + }); + + if batch_count == 0 { + return Ok(0); + } + + let has_segments = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.has_segments() + }); + + if !has_segments { + return Ok(0); + } + + let (messages_writer, index_writer) = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + ( + log.active_storage() + .messages_writer + .as_ref() + .expect("Messages writer not initialized") + .clone(), + log.active_storage() + .index_writer + .as_ref() + .expect("Index writer not initialized") + .clone(), + ) + }); + let guard = messages_writer.lock.lock().await; + + let saved = messages_writer + .as_ref() + .save_frozen_batches(frozen_batches) + .await + .error(|e: &IggyError| { + format!( + "Failed to save batch of {batch_count} messages \ + ({batch_size} bytes) to stream ID: {stream_id}, topic ID: {topic_id}, partition ID: {partition_id}. {e}", + ) + })?; + + let unsaved_indexes_slice = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.active_indexes().unwrap().unsaved_slice() + }); + + let indexes_len = unsaved_indexes_slice.len(); + index_writer + .as_ref() + .save_indexes(unsaved_indexes_slice) + .await + .error(|e: &IggyError| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) + })?; + + tracing::trace!( + "Persisted {} frozen messages on disk for stream ID: {}, topic ID: {}, for partition with ID: {}, total bytes written: {}.", + batch_count, + stream_id, + topic_id, + partition_id, + saved + ); + + self.with_partition_by_id_mut( + stream_id, + topic_id, + partition_id, + streaming_partitions::helpers::update_index_and_increment_stats(saved, config), + ); + + self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { + log.clear_in_flight(); + }); + + drop(guard); + Ok(batch_count) + } + pub async fn fsync_all_messages( &self, stream_id: &Identifier, diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 531e77e54..c9c8e029c 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -430,6 +430,29 @@ pub fn append_to_journal( } } +/// Commit journal and set in-flight buffer with frozen batches. +/// Returns frozen batches for persisting to disk. +pub fn commit_journal_with_in_flight() +-> impl FnOnce(ComponentsById<PartitionRefMut>) -> Vec<iggy_common::IggyMessagesBatch> { + |(.., log)| { + let mut batches = log.journal_mut().commit(); + if batches.is_empty() { + return Vec::new(); + } + log.ensure_indexes(); + batches.append_indexes_to(log.active_indexes_mut().unwrap()); + let frozen: Vec<_> = batches.iter_mut().map(|b| b.freeze()).collect(); + log.set_in_flight(frozen.clone()); + frozen + } +} + +pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) { + |(.., log)| { + log.clear_in_flight(); + } +} + pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSet { |(.., log)| { let batches = log.journal_mut().commit(); diff --git a/core/server/src/streaming/partitions/in_flight.rs b/core/server/src/streaming/partitions/in_flight.rs index af90d3e7d..946642958 100644 --- a/core/server/src/streaming/partitions/in_flight.rs +++ b/core/server/src/streaming/partitions/in_flight.rs @@ -58,8 +58,10 @@ impl IggyMessagesBatchSetInFlight { self.last_offset = 0; } + /// Get batches that overlap with the requested offset range. + /// Returns only the batches that contain messages within [start_offset, start_offset + count). pub fn get_by_offset(&self, start_offset: u64, count: u32) -> &[IggyMessagesBatch] { - if self.is_empty() || start_offset > self.last_offset { + if self.is_empty() || start_offset > self.last_offset || count == 0 { return &[]; } @@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight { return &[]; } - &self.batches + let start_idx = self + .batches + .iter() + .position(|b| { + b.last_offset() + .map(|last| last >= start_offset) + .unwrap_or(false) + }) + .unwrap_or(0); + + let end_idx = self + .batches + .iter() + .rposition(|b| { + b.first_offset() + .map(|first| first <= end_offset) + .unwrap_or(false) + }) + .map(|i| i + 1) + .unwrap_or(self.batches.len()); + + &self.batches[start_idx..end_idx] } pub fn batches(&self) -> &[IggyMessagesBatch] {
