This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new f5f9bb51c fix(server): prevent message unavailability during
background persistence (#2578)
f5f9bb51c is described below
commit f5f9bb51c9edcfe27bbe3216d13b8cd515a5a704
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Sun Jan 18 10:17:22 2026 +0100
fix(server): prevent message unavailability during background persistence
(#2578)
Messages were unavailable when background message_saver committed the
journal (emptying it) and started async disk I/O before completion.
Polling during this window found neither journal nor disk data.
The fix freezes journal batches and sets them in the in-flight buffer
before async persist. Readers now merge in-flight data with segments,
maintaining availability throughout the disk write operation.
---
.../scenarios/read_during_persistence_scenario.rs | 77 ++++++++---------
core/server/src/shard/system/messages.rs | 12 ++-
core/server/src/slab/streams.rs | 96 ++++++++++++++++++++++
core/server/src/streaming/partitions/helpers.rs | 23 ++++++
core/server/src/streaming/partitions/in_flight.rs | 27 +++++-
5 files changed, 192 insertions(+), 43 deletions(-)
diff --git
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
index 0ff9c8bd3..5153337c2 100644
---
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
+++
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
@@ -28,15 +28,21 @@ use std::time::{Duration, Instant};
const STREAM_NAME: &str = "eventual-consistency-stream";
const TOPIC_NAME: &str = "eventual-consistency-topic";
-const TEST_DURATION_SECS: u64 = 10;
-/// Test with two separate clients - one for sending, one for polling.
+/// Test that specifically targets the background message_saver race condition.
///
-/// This should expose the race condition where messages are in-flight during
-/// disk write and unavailable for polling.
+/// Setup:
+/// - HIGH inline persistence thresholds (messages stay in journal during send)
+/// - SHORT message_saver interval (background flushes happen frequently)
+///
+/// The race occurs when:
+/// 1. Messages are in journal
+/// 2. Background saver calls commit() - journal becomes EMPTY
+/// 3. Background saver starts async disk write
+/// 4. Poll arrives - sees empty journal, data not yet on disk
#[tokio::test]
#[parallel]
-async fn
should_read_messages_immediately_after_send_with_aggressive_persistence() {
+async fn should_read_messages_during_background_saver_flush() {
let env_vars = HashMap::from([
(
SYSTEM_PATH_ENV_VAR.to_owned(),
@@ -44,16 +50,18 @@ async fn
should_read_messages_immediately_after_send_with_aggressive_persistence
),
(
"IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
- "32".to_owned(),
+ "100000".to_owned(),
),
(
"IGGY_SYSTEM_PARTITION_SIZE_OF_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
- "512B".to_owned(),
+ "1GB".to_owned(),
),
(
"IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_owned(),
"false".to_owned(),
),
+ ("IGGY_MESSAGE_SAVER_INTERVAL".to_owned(), "100ms".to_owned()),
+ ("IGGY_MESSAGE_SAVER_ENABLED".to_owned(), "true".to_owned()),
]);
let mut test_server = TestServer::new(Some(env_vars), true, None,
IpAddrKind::V4);
@@ -96,21 +104,23 @@ async fn
should_read_messages_immediately_after_send_with_aggressive_persistence
let topic_id = Identifier::named(TOPIC_NAME).unwrap();
let consumer_kind = Consumer::default();
- let test_duration = Duration::from_secs(TEST_DURATION_SECS);
- let messages_per_batch = 32u32;
- let payload = "X".repeat(1024);
+ let test_duration = Duration::from_secs(10);
+ let messages_per_batch = 10u32;
+ let payload = "X".repeat(512);
let start = Instant::now();
let mut batches_sent = 0u64;
- let mut messages_sent = 0u64;
+ let mut next_offset = 0u64;
println!(
- "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
- messages_per_batch, TEST_DURATION_SECS
+ "Starting background saver race test: {} msgs/batch, duration: {}s",
+ messages_per_batch,
+ test_duration.as_secs()
);
+ println!("Inline persistence DISABLED (high thresholds), background saver
every 100ms");
while start.elapsed() < test_duration {
- let base_offset = batches_sent * messages_per_batch as u64;
+ let base_offset = next_offset;
let mut messages: Vec<IggyMessage> = (0..messages_per_batch)
.map(|i| {
@@ -125,68 +135,61 @@ async fn
should_read_messages_immediately_after_send_with_aggressive_persistence
})
.collect();
- println!("Sending batch {}", batches_sent);
- let send_result = producer
+ producer
.send_messages(
&stream_id,
&topic_id,
&Partitioning::partition_id(0),
&mut messages,
)
- .await;
- match &send_result {
- Ok(_) => println!("Batch {} sent successfully", batches_sent),
- Err(e) => println!("Batch {} send error: {:?}", batches_sent, e),
- }
- send_result.unwrap();
+ .await
+ .unwrap();
batches_sent += 1;
- messages_sent += messages_per_batch as u64;
- println!("Calling poll_messages after batch {}", batches_sent);
let poll_result = consumer
.poll_messages(
&stream_id,
&topic_id,
Some(0),
&consumer_kind,
- &PollingStrategy::offset(0),
- messages_sent as u32,
+ &PollingStrategy::offset(next_offset),
+ messages_per_batch,
false,
)
.await;
let polled_count = match &poll_result {
- Ok(polled) => polled.messages.len() as u64,
+ Ok(polled) => polled.messages.len() as u32,
Err(e) => {
println!("Poll error: {:?}", e);
0
}
};
- if polled_count < messages_sent {
- let missing = messages_sent - polled_count;
+ if polled_count < messages_per_batch {
+ let missing = messages_per_batch - polled_count;
let elapsed_ms = start.elapsed().as_millis();
panic!(
- "RACE CONDITION DETECTED after {:.2}s/{}s ({} batches, {}
messages), expected {} messages, got {}. Missing: {}",
+ "RACE CONDITION DETECTED after {:.2}s ({} batches), polled
from offset {}, expected {}, got {}. Missing: {}",
elapsed_ms as f64 / 1000.0,
- TEST_DURATION_SECS,
batches_sent,
- messages_sent,
- messages_sent,
+ next_offset,
+ messages_per_batch,
polled_count,
missing
);
}
- if batches_sent.is_multiple_of(1000) {
+ next_offset += messages_per_batch as u64;
+
+ if batches_sent.is_multiple_of(500) {
println!(
- "Progress: {} batches, {} messages, elapsed: {:.2}s/{}s",
+ "Progress: {} batches, {} messages, elapsed: {:.2}s",
batches_sent,
- messages_sent,
+ next_offset,
start.elapsed().as_secs_f64(),
- TEST_DURATION_SECS
);
}
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 02e8d9d0e..7f4f299e7 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -315,19 +315,23 @@ impl IggyShard {
partition_id: usize,
fsync: bool,
) -> Result<(), IggyError> {
- let batches = self.streams.with_partition_by_id_mut(
+ let frozen_batches = self.streams.with_partition_by_id_mut(
stream_id,
topic_id,
partition_id,
- partitions::helpers::commit_journal(),
+ partitions::helpers::commit_journal_with_in_flight(),
);
+ if frozen_batches.is_empty() {
+ return Ok(());
+ }
+
self.streams
- .persist_messages_to_disk(
+ .persist_frozen_messages_to_disk(
stream_id,
topic_id,
partition_id,
- batches,
+ &frozen_batches,
&self.config.system,
)
.await?;
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f1c342de2..37e4192c4 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1414,6 +1414,102 @@ impl Streams {
Ok(batch_count)
}
+ /// Persist frozen (immutable) batches to disk.
+ /// Assumes in-flight buffer is already set by caller.
+ /// Clears in-flight buffer after successful persist.
+ pub async fn persist_frozen_messages_to_disk(
+ &self,
+ stream_id: &Identifier,
+ topic_id: &Identifier,
+ partition_id: usize,
+ frozen_batches: &[IggyMessagesBatch],
+ config: &SystemConfig,
+ ) -> Result<u32, IggyError> {
+ let (batch_count, batch_size): (u32, u32) = frozen_batches
+ .iter()
+ .fold((0u32, 0u32), |(count_acc, size_acc), b| {
+ (count_acc + b.count(), size_acc + b.size())
+ });
+
+ if batch_count == 0 {
+ return Ok(0);
+ }
+
+ let has_segments =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.has_segments()
+ });
+
+ if !has_segments {
+ return Ok(0);
+ }
+
+ let (messages_writer, index_writer) =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ (
+ log.active_storage()
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer not initialized")
+ .clone(),
+ log.active_storage()
+ .index_writer
+ .as_ref()
+ .expect("Index writer not initialized")
+ .clone(),
+ )
+ });
+ let guard = messages_writer.lock.lock().await;
+
+ let saved = messages_writer
+ .as_ref()
+ .save_frozen_batches(frozen_batches)
+ .await
+ .error(|e: &IggyError| {
+ format!(
+ "Failed to save batch of {batch_count} messages \
+ ({batch_size} bytes) to stream ID: {stream_id}, topic ID:
{topic_id}, partition ID: {partition_id}. {e}",
+ )
+ })?;
+
+ let unsaved_indexes_slice =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.active_indexes().unwrap().unsaved_slice()
+ });
+
+ let indexes_len = unsaved_indexes_slice.len();
+ index_writer
+ .as_ref()
+ .save_indexes(unsaved_indexes_slice)
+ .await
+ .error(|e: &IggyError| {
+ format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+ })?;
+
+ tracing::trace!(
+ "Persisted {} frozen messages on disk for stream ID: {}, topic ID:
{}, for partition with ID: {}, total bytes written: {}.",
+ batch_count,
+ stream_id,
+ topic_id,
+ partition_id,
+ saved
+ );
+
+ self.with_partition_by_id_mut(
+ stream_id,
+ topic_id,
+ partition_id,
+
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
+ );
+
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.clear_in_flight();
+ });
+
+ drop(guard);
+ Ok(batch_count)
+ }
+
pub async fn fsync_all_messages(
&self,
stream_id: &Identifier,
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 531e77e54..c9c8e029c 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -430,6 +430,29 @@ pub fn append_to_journal(
}
}
+/// Commit journal and set in-flight buffer with frozen batches.
+/// Returns frozen batches for persisting to disk.
+pub fn commit_journal_with_in_flight()
+-> impl FnOnce(ComponentsById<PartitionRefMut>) ->
Vec<iggy_common::IggyMessagesBatch> {
+ |(.., log)| {
+ let mut batches = log.journal_mut().commit();
+ if batches.is_empty() {
+ return Vec::new();
+ }
+ log.ensure_indexes();
+ batches.append_indexes_to(log.active_indexes_mut().unwrap());
+ let frozen: Vec<_> = batches.iter_mut().map(|b| b.freeze()).collect();
+ log.set_in_flight(frozen.clone());
+ frozen
+ }
+}
+
+pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) {
+ |(.., log)| {
+ log.clear_in_flight();
+ }
+}
+
pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) ->
IggyMessagesBatchSet {
|(.., log)| {
let batches = log.journal_mut().commit();
diff --git a/core/server/src/streaming/partitions/in_flight.rs
b/core/server/src/streaming/partitions/in_flight.rs
index af90d3e7d..946642958 100644
--- a/core/server/src/streaming/partitions/in_flight.rs
+++ b/core/server/src/streaming/partitions/in_flight.rs
@@ -58,8 +58,10 @@ impl IggyMessagesBatchSetInFlight {
self.last_offset = 0;
}
+ /// Get batches that overlap with the requested offset range.
+ /// Returns only the batches that contain messages within [start_offset,
start_offset + count).
pub fn get_by_offset(&self, start_offset: u64, count: u32) ->
&[IggyMessagesBatch] {
- if self.is_empty() || start_offset > self.last_offset {
+ if self.is_empty() || start_offset > self.last_offset || count == 0 {
return &[];
}
@@ -68,7 +70,28 @@ impl IggyMessagesBatchSetInFlight {
return &[];
}
- &self.batches
+ let start_idx = self
+ .batches
+ .iter()
+ .position(|b| {
+ b.last_offset()
+ .map(|last| last >= start_offset)
+ .unwrap_or(false)
+ })
+ .unwrap_or(0);
+
+ let end_idx = self
+ .batches
+ .iter()
+ .rposition(|b| {
+ b.first_offset()
+ .map(|first| first <= end_offset)
+ .unwrap_or(false)
+ })
+ .map(|i| i + 1)
+ .unwrap_or(self.batches.len());
+
+ &self.batches[start_idx..end_idx]
}
pub fn batches(&self) -> &[IggyMessagesBatch] {