This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch fix-persistence-read
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit c9a2d3ab81bd51127ce3cf794b4506acb08010c9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 18:09:52 2026 +0100

    fix(server): prevent message unavailability during background persistence
    
    Messages were unavailable when background message_saver committed the
    journal (emptying it) and started async disk I/O before completion.
    Polling during this window found neither journal nor disk data.
    
    The fix freezes journal batches and sets them in the in-flight buffer
    before async persist. Readers now merge in-flight data with segments,
    maintaining availability throughout the disk write operation.
---
 .../scenarios/read_during_persistence_scenario.rs  | 77 ++++++++---------
 core/server/src/shard/system/messages.rs           | 12 ++-
 core/server/src/slab/streams.rs                    | 96 ++++++++++++++++++++++
 core/server/src/streaming/partitions/helpers.rs    | 23 ++++++
 core/server/src/streaming/partitions/in_flight.rs  | 27 +++++-
 5 files changed, 192 insertions(+), 43 deletions(-)

diff --git 
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs 
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
index 0ff9c8bd3..5153337c2 100644
--- 
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
@@ -28,15 +28,21 @@ use std::time::{Duration, Instant};
 
 const STREAM_NAME: &str = "eventual-consistency-stream";
 const TOPIC_NAME: &str = "eventual-consistency-topic";
-const TEST_DURATION_SECS: u64 = 10;
 
-/// Test with two separate clients - one for sending, one for polling.
+/// Test that specifically targets the background message_saver race condition.
 ///
-/// This should expose the race condition where messages are in-flight during
-/// disk write and unavailable for polling.
+/// Setup:
+/// - HIGH inline persistence thresholds (messages stay in journal during send)
+/// - SHORT message_saver interval (background flushes happen frequently)
+///
+/// The race occurs when:
+/// 1. Messages are in journal
+/// 2. Background saver calls commit() - journal becomes EMPTY
+/// 3. Background saver starts async disk write
+/// 4. Poll arrives - sees empty journal, data not yet on disk
 #[tokio::test]
 #[parallel]
-async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence() {
+async fn should_read_messages_during_background_saver_flush() {
     let env_vars = HashMap::from([
         (
             SYSTEM_PATH_ENV_VAR.to_owned(),
@@ -44,16 +50,18 @@ async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence
         ),
         (
             "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
-            "32".to_owned(),
+            "100000".to_owned(),
         ),
         (
             
"IGGY_SYSTEM_PARTITION_SIZE_OF_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
-            "512B".to_owned(),
+            "1GB".to_owned(),
         ),
         (
             "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_owned(),
             "false".to_owned(),
         ),
+        ("IGGY_MESSAGE_SAVER_INTERVAL".to_owned(), "100ms".to_owned()),
+        ("IGGY_MESSAGE_SAVER_ENABLED".to_owned(), "true".to_owned()),
     ]);
 
     let mut test_server = TestServer::new(Some(env_vars), true, None, 
IpAddrKind::V4);
@@ -96,21 +104,23 @@ async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence
     let topic_id = Identifier::named(TOPIC_NAME).unwrap();
     let consumer_kind = Consumer::default();
 
-    let test_duration = Duration::from_secs(TEST_DURATION_SECS);
-    let messages_per_batch = 32u32;
-    let payload = "X".repeat(1024);
+    let test_duration = Duration::from_secs(10);
+    let messages_per_batch = 10u32;
+    let payload = "X".repeat(512);
 
     let start = Instant::now();
     let mut batches_sent = 0u64;
-    let mut messages_sent = 0u64;
+    let mut next_offset = 0u64;
 
     println!(
-        "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
-        messages_per_batch, TEST_DURATION_SECS
+        "Starting background saver race test: {} msgs/batch, duration: {}s",
+        messages_per_batch,
+        test_duration.as_secs()
     );
+    println!("Inline persistence DISABLED (high thresholds), background saver 
every 100ms");
 
     while start.elapsed() < test_duration {
-        let base_offset = batches_sent * messages_per_batch as u64;
+        let base_offset = next_offset;
 
         let mut messages: Vec<IggyMessage> = (0..messages_per_batch)
             .map(|i| {
@@ -125,68 +135,61 @@ async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence
             })
             .collect();
 
-        println!("Sending batch {}", batches_sent);
-        let send_result = producer
+        producer
             .send_messages(
                 &stream_id,
                 &topic_id,
                 &Partitioning::partition_id(0),
                 &mut messages,
             )
-            .await;
-        match &send_result {
-            Ok(_) => println!("Batch {} sent successfully", batches_sent),
-            Err(e) => println!("Batch {} send error: {:?}", batches_sent, e),
-        }
-        send_result.unwrap();
+            .await
+            .unwrap();
 
         batches_sent += 1;
-        messages_sent += messages_per_batch as u64;
 
-        println!("Calling poll_messages after batch {}", batches_sent);
         let poll_result = consumer
             .poll_messages(
                 &stream_id,
                 &topic_id,
                 Some(0),
                 &consumer_kind,
-                &PollingStrategy::offset(0),
-                messages_sent as u32,
+                &PollingStrategy::offset(next_offset),
+                messages_per_batch,
                 false,
             )
             .await;
 
         let polled_count = match &poll_result {
-            Ok(polled) => polled.messages.len() as u64,
+            Ok(polled) => polled.messages.len() as u32,
             Err(e) => {
                 println!("Poll error: {:?}", e);
                 0
             }
         };
 
-        if polled_count < messages_sent {
-            let missing = messages_sent - polled_count;
+        if polled_count < messages_per_batch {
+            let missing = messages_per_batch - polled_count;
             let elapsed_ms = start.elapsed().as_millis();
 
             panic!(
-                "RACE CONDITION DETECTED after {:.2}s/{}s ({} batches, {} 
messages), expected {} messages, got {}. Missing: {}",
+                "RACE CONDITION DETECTED after {:.2}s ({} batches), polled 
from offset {}, expected {}, got {}. Missing: {}",
                 elapsed_ms as f64 / 1000.0,
-                TEST_DURATION_SECS,
                 batches_sent,
-                messages_sent,
-                messages_sent,
+                next_offset,
+                messages_per_batch,
                 polled_count,
                 missing
             );
         }
 
-        if batches_sent.is_multiple_of(1000) {
+        next_offset += messages_per_batch as u64;
+
+        if batches_sent.is_multiple_of(500) {
             println!(
-                "Progress: {} batches, {} messages, elapsed: {:.2}s/{}s",
+                "Progress: {} batches, {} messages, elapsed: {:.2}s",
                 batches_sent,
-                messages_sent,
+                next_offset,
                 start.elapsed().as_secs_f64(),
-                TEST_DURATION_SECS
             );
         }
     }
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 02e8d9d0e..7f4f299e7 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -315,19 +315,23 @@ impl IggyShard {
         partition_id: usize,
         fsync: bool,
     ) -> Result<(), IggyError> {
-        let batches = self.streams.with_partition_by_id_mut(
+        let frozen_batches = self.streams.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
-            partitions::helpers::commit_journal(),
+            partitions::helpers::commit_journal_with_in_flight(),
         );
 
+        if frozen_batches.is_empty() {
+            return Ok(());
+        }
+
         self.streams
-            .persist_messages_to_disk(
+            .persist_frozen_messages_to_disk(
                 stream_id,
                 topic_id,
                 partition_id,
-                batches,
+                &frozen_batches,
                 &self.config.system,
             )
             .await?;
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f1c342de2..37e4192c4 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1414,6 +1414,102 @@ impl Streams {
         Ok(batch_count)
     }
 
+    /// Persist frozen (immutable) batches to disk.
+    /// Assumes in-flight buffer is already set by caller.
+    /// Clears in-flight buffer after successful persist.
+    pub async fn persist_frozen_messages_to_disk(
+        &self,
+        stream_id: &Identifier,
+        topic_id: &Identifier,
+        partition_id: usize,
+        frozen_batches: &[IggyMessagesBatch],
+        config: &SystemConfig,
+    ) -> Result<u32, IggyError> {
+        let (batch_count, batch_size): (u32, u32) = frozen_batches
+            .iter()
+            .fold((0u32, 0u32), |(count_acc, size_acc), b| {
+                (count_acc + b.count(), size_acc + b.size())
+            });
+
+        if batch_count == 0 {
+            return Ok(0);
+        }
+
+        let has_segments =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.has_segments()
+            });
+
+        if !has_segments {
+            return Ok(0);
+        }
+
+        let (messages_writer, index_writer) =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                (
+                    log.active_storage()
+                        .messages_writer
+                        .as_ref()
+                        .expect("Messages writer not initialized")
+                        .clone(),
+                    log.active_storage()
+                        .index_writer
+                        .as_ref()
+                        .expect("Index writer not initialized")
+                        .clone(),
+                )
+            });
+        let guard = messages_writer.lock.lock().await;
+
+        let saved = messages_writer
+            .as_ref()
+            .save_frozen_batches(frozen_batches)
+            .await
+            .error(|e: &IggyError| {
+                format!(
+                    "Failed to save batch of {batch_count} messages \
+                    ({batch_size} bytes) to stream ID: {stream_id}, topic ID: 
{topic_id}, partition ID: {partition_id}. {e}",
+                )
+            })?;
+
+        let unsaved_indexes_slice =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                log.active_indexes().unwrap().unsaved_slice()
+            });
+
+        let indexes_len = unsaved_indexes_slice.len();
+        index_writer
+            .as_ref()
+            .save_indexes(unsaved_indexes_slice)
+            .await
+            .error(|e: &IggyError| {
+                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+            })?;
+
+        tracing::trace!(
+            "Persisted {} frozen messages on disk for stream ID: {}, topic ID: 
{}, for partition with ID: {}, total bytes written: {}.",
+            batch_count,
+            stream_id,
+            topic_id,
+            partition_id,
+            saved
+        );
+
+        self.with_partition_by_id_mut(
+            stream_id,
+            topic_id,
+            partition_id,
+            
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
+        );
+
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.clear_in_flight();
+        });
+
+        drop(guard);
+        Ok(batch_count)
+    }
+
     pub async fn fsync_all_messages(
         &self,
         stream_id: &Identifier,
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 531e77e54..c9c8e029c 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -430,6 +430,29 @@ pub fn append_to_journal(
     }
 }
 
+/// Commit journal and set in-flight buffer with frozen batches.
+/// Returns frozen batches for persisting to disk.
+pub fn commit_journal_with_in_flight()
+-> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
Vec<iggy_common::IggyMessagesBatch> {
+    |(.., log)| {
+        let mut batches = log.journal_mut().commit();
+        if batches.is_empty() {
+            return Vec::new();
+        }
+        log.ensure_indexes();
+        batches.append_indexes_to(log.active_indexes_mut().unwrap());
+        let frozen: Vec<_> = batches.iter_mut().map(|b| b.freeze()).collect();
+        log.set_in_flight(frozen.clone());
+        frozen
+    }
+}
+
+pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) {
+    |(.., log)| {
+        log.clear_in_flight();
+    }
+}
+
 pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSet {
     |(.., log)| {
         let batches = log.journal_mut().commit();
diff --git a/core/server/src/streaming/partitions/in_flight.rs 
b/core/server/src/streaming/partitions/in_flight.rs
index af90d3e7d..946642958 100644
--- a/core/server/src/streaming/partitions/in_flight.rs
+++ b/core/server/src/streaming/partitions/in_flight.rs
@@ -58,8 +58,10 @@ impl IggyMessagesBatchSetInFlight {
         self.last_offset = 0;
     }
 
+    /// Get batches that overlap with the requested offset range.
+    /// Returns only the batches that contain messages within [start_offset, 
start_offset + count).
     pub fn get_by_offset(&self, start_offset: u64, count: u32) -> 
&[IggyMessagesBatch] {
-        if self.is_empty() || start_offset > self.last_offset {
+        if self.is_empty() || start_offset > self.last_offset || count == 0 {
             return &[];
         }
 
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
             return &[];
         }
 
-        &self.batches
+        let start_idx = self
+            .batches
+            .iter()
+            .position(|b| {
+                b.last_offset()
+                    .map(|last| last >= start_offset)
+                    .unwrap_or(false)
+            })
+            .unwrap_or(0);
+
+        let end_idx = self
+            .batches
+            .iter()
+            .rposition(|b| {
+                b.first_offset()
+                    .map(|first| first <= end_offset)
+                    .unwrap_or(false)
+            })
+            .map(|i| i + 1)
+            .unwrap_or(self.batches.len());
+
+        &self.batches[start_idx..end_idx]
     }
 
     pub fn batches(&self) -> &[IggyMessagesBatch] {

Reply via email to