This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 75e30ab42 fix(server): prevent panic when segment rotates during async
persistence (#2588)
75e30ab42 is described below
commit 75e30ab42b9b79f1e66f109e15531c9ba4d31e79
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Tue Jan 20 11:49:51 2026 +0100
fix(server): prevent panic when segment rotates during async persistence
(#2588)
Concurrent sends could race with segment rotation: Task A
starts persisting segment N while Task B rotates to N+1,
clearing N's indexes. Task A then panics on
active_indexes().unwrap().
Capture writers, indexes, and segment_idx atomically at commit
time so persistence completes correctly even if rotation occurs
mid-flight.
This fixes #2572.
---
Cargo.lock | 2 +-
DEPENDENCIES.md | 2 +-
core/integration/tests/server/scenarios/mod.rs | 1 +
.../scenarios/segment_rotation_race_scenario.rs | 196 +++++++++++
core/integration/tests/server/specific.rs | 84 ++++-
core/server/Cargo.toml | 2 +-
.../handlers/messages/send_messages_handler.rs | 2 +-
core/server/src/shard/system/messages.rs | 11 +-
core/server/src/slab/streams.rs | 388 +++++++++------------
core/server/src/streaming/partitions/helpers.rs | 152 ++++----
.../streaming/segments/messages/messages_writer.rs | 44 +--
core/server/src/streaming/segments/messages/mod.rs | 52 +--
core/server/src/streaming/segments/mod.rs | 2 +
13 files changed, 536 insertions(+), 402 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index dd5bc7d89..f88aaf4a6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8270,7 +8270,7 @@ dependencies = [
[[package]]
name = "server"
-version = "0.6.1-edge.4"
+version = "0.6.1-edge.5"
dependencies = [
"ahash 0.8.12",
"anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 3ea9bbf8b..814a96a36 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -726,7 +726,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
serde_yaml_ng: 0.10.0, "MIT",
serial_test: 3.3.1, "MIT",
serial_test_derive: 3.3.1, "MIT",
-server: 0.6.1-edge.4, "Apache-2.0",
+server: 0.6.1-edge.5, "Apache-2.0",
sha1: 0.10.6, "Apache-2.0 OR MIT",
sha2: 0.10.9, "Apache-2.0 OR MIT",
sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 1f71098fb..a61636931 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -32,6 +32,7 @@ pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod permissions_scenario;
pub mod read_during_persistence_scenario;
+pub mod segment_rotation_race_scenario;
pub mod single_message_per_batch_scenario;
pub mod stale_client_consumer_group_scenario;
pub mod stream_size_validation_scenario;
diff --git
a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
new file mode 100644
index 000000000..9856777b2
--- /dev/null
+++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! This scenario fixes the bug that occured when concurrent message sends
race with segment rotation:
+//! 1. Task A commits journal, ensures indexes for segment N, starts async save
+//! 2. Task B's send triggers segment rotation (handle_full_segment)
+//! 3. Task B clears segment N's indexes or creates segment N+1 with None
indexes
+//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None
+//!
+//! This test uses:
+//! - Very small segment size (512B) to trigger frequent rotations
+//! - 8 concurrent producers (2 per protocol: TCP, HTTP, QUIC, WebSocket)
+//! - All producers write to the same partition for maximum lock contention
+//! - Short message_saver interval to add more concurrent persist operations
+
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::Duration;
+use tokio::task::JoinSet;
+
+const STREAM_NAME: &str = "race-test-stream";
+const TOPIC_NAME: &str = "race-test-topic";
+const PRODUCERS_PER_PROTOCOL: usize = 2;
+const PARTITION_ID: u32 = 0;
+const TEST_DURATION_SECS: u64 = 10;
+const MESSAGES_PER_BATCH: usize = 5;
+
+/// Runs the segment rotation race condition test with multiple protocols.
+/// Each client factory represents a different protocol (TCP, HTTP, QUIC,
WebSocket).
+/// 2 producers are spawned per protocol, all writing to the same partition.
+pub async fn run(client_factories: &[&dyn ClientFactory]) {
+ assert!(
+ !client_factories.is_empty(),
+ "At least one client factory required"
+ );
+
+ let admin_client = create_client(client_factories[0]).await;
+ login_root(&admin_client).await;
+
+ let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL;
+ init_system(&admin_client, total_producers).await;
+
+ let stop_flag = Arc::new(AtomicBool::new(false));
+ let total_messages = Arc::new(AtomicU64::new(0));
+ let mut join_set = JoinSet::new();
+
+ let mut global_producer_id = 0usize;
+ for factory in client_factories {
+ let protocol = factory.transport();
+ for local_id in 0..PRODUCERS_PER_PROTOCOL {
+ let client = create_client(*factory).await;
+ login_root(&client).await;
+
+ let stop = stop_flag.clone();
+ let counter = total_messages.clone();
+ let producer_name = format!("{:?}-{}", protocol, local_id);
+ let producer_id = global_producer_id;
+
+ join_set.spawn(async move {
+ run_producer(
+ client,
+ producer_id,
+ &producer_name,
+ PARTITION_ID,
+ stop,
+ counter,
+ )
+ .await;
+ });
+
+ global_producer_id += 1;
+ }
+ }
+
+ tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await;
+ stop_flag.store(true, Ordering::SeqCst);
+
+ while let Some(result) = join_set.join_next().await {
+ if let Err(e) = result
+ && e.is_panic()
+ {
+ let panic_info = e.into_panic();
+ let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>()
{
+ s.to_string()
+ } else if let Some(s) = panic_info.downcast_ref::<String>() {
+ s.clone()
+ } else {
+ "Unknown panic".to_string()
+ };
+ panic!("Producer task panicked: {}", panic_msg);
+ }
+ }
+
+ let sent = total_messages.load(Ordering::SeqCst);
+ println!("Test completed successfully. Total messages sent: {}", sent);
+
+ cleanup(&admin_client).await;
+}
+
+async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient {
+ let client = client_factory.create_client().await;
+ IggyClient::create(client, None, None)
+}
+
+async fn init_system(client: &IggyClient, total_producers: usize) {
+ client.create_stream(STREAM_NAME).await.unwrap();
+
+ client
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ println!(
+ "Created stream and topic with 1 partition, {} producers will contend
for it",
+ total_producers
+ );
+}
+
+async fn run_producer(
+ client: IggyClient,
+ producer_id: usize,
+ producer_name: &str,
+ partition_id: u32,
+ stop: Arc<AtomicBool>,
+ counter: Arc<AtomicU64>,
+) {
+ let mut batch_num = 0u64;
+
+ while !stop.load(Ordering::SeqCst) {
+ let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH);
+
+ for i in 0..MESSAGES_PER_BATCH {
+ let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i);
+ let message = IggyMessage::builder()
+ .payload(payload.into_bytes().into())
+ .build()
+ .unwrap();
+ messages.push(message);
+ }
+
+ match client
+ .send_messages(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ &Identifier::named(TOPIC_NAME).unwrap(),
+ &Partitioning::partition_id(partition_id),
+ &mut messages,
+ )
+ .await
+ {
+ Ok(_) => {
+ counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst);
+ batch_num += 1;
+ }
+ Err(e) => {
+ eprintln!("Producer {} send error: {}", producer_name, e);
+ }
+ }
+ }
+
+ println!(
+ "Producer {} (partition {}) stopped after {} batches",
+ producer_name, partition_id, batch_num
+ );
+}
+
+async fn cleanup(client: &IggyClient) {
+ client
+ .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+ .await
+ .unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index 7df3d75d9..9a59ce468 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,14 +17,17 @@
*/
use crate::server::scenarios::{
- delete_segments_scenario, message_size_scenario,
single_message_per_batch_scenario,
- tcp_tls_scenario, websocket_tls_scenario,
+ delete_segments_scenario, message_size_scenario,
segment_rotation_race_scenario,
+ single_message_per_batch_scenario, tcp_tls_scenario,
websocket_tls_scenario,
};
use iggy::prelude::*;
use integration::{
+ http_client::HttpClientFactory,
+ quic_client::QuicClientFactory,
tcp_client::TcpClientFactory,
test_server::{IpAddrKind, TestServer},
test_tls_utils::generate_test_certificates,
+ websocket_client::WebSocketClientFactory,
};
use serial_test::parallel;
use std::collections::HashMap;
@@ -212,3 +215,80 @@ async fn
should_handle_single_message_per_batch_with_delayed_persistence() {
single_message_per_batch_scenario::run(&client_factory, 5).await;
}
+
+/// This test configures the server to trigger frequent segment rotations and
runs
+/// multiple concurrent producers across all protocols (TCP, HTTP, QUIC,
WebSocket)
+/// to maximize the chance of hitting the race condition between
persist_messages_to_disk
+/// and handle_full_segment.
+///
+/// Server configuration:
+/// - Very small segment size (512B) to trigger frequent rotations
+/// - Short message_saver interval (1s) to add concurrent persist operations
+/// - Small messages_required_to_save (32) to trigger more frequent saves
+/// - cache_indexes = none to trigger clear_active_indexes path
+///
+/// Test configuration:
+/// - 8 producers total (2 per protocol: TCP, HTTP, QUIC, WebSocket)
+/// - All producers write to the same partition for maximum lock contention
+#[tokio::test]
+#[parallel]
+async fn segment_rotation_scenario() {
+ let mut extra_envs = HashMap::new();
+
+ // Very small segment to trigger frequent rotations
+ extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"512B".to_string());
+
+ // Short message saver interval to add concurrent persist operations
+ extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(),
"1s".to_string());
+
+ // Small threshold to trigger more frequent saves
+ extra_envs.insert(
+ "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+ "32".to_string(),
+ );
+
+ // cache_indexes = none triggers clear_active_indexes in
handle_full_segment
+ extra_envs.insert(
+ "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+ "none".to_string(),
+ );
+
+ // Disable socket migration to keep all connections on same shard
+ extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(),
"false".to_string());
+
+ // Enable TCP nodelay for faster message throughput
+ extra_envs.insert(
+ "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+ "true".to_string(),
+ );
+ extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(),
"true".to_string());
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let tcp_factory = TcpClientFactory {
+ server_addr: test_server.get_raw_tcp_addr().unwrap(),
+ ..Default::default()
+ };
+
+ let http_factory = HttpClientFactory {
+ server_addr: test_server.get_http_api_addr().unwrap(),
+ };
+
+ let quic_factory = QuicClientFactory {
+ server_addr: test_server.get_quic_udp_addr().unwrap(),
+ };
+
+ let websocket_factory = WebSocketClientFactory {
+ server_addr: test_server.get_websocket_addr().unwrap(),
+ };
+
+ let factories: Vec<&dyn integration::test_server::ClientFactory> = vec![
+ &tcp_factory,
+ &http_factory,
+ &quic_factory,
+ &websocket_factory,
+ ];
+
+ segment_rotation_race_scenario::run(&factories).await;
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index e80c3446d..743f14f18 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
[package]
name = "server"
-version = "0.6.1-edge.4"
+version = "0.6.1-edge.5"
edition = "2024"
license = "Apache-2.0"
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 2e2748776..2181a9502 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -187,7 +187,7 @@ impl ServerCommandHandler for SendMessages {
.send_request_to_shard_or_recoil(Some(&namespace),
socket_transfer_msg)
.await
{
- error!("tranfer socket to another shard failed, drop
connection. {e:?}");
+ error!("transfer socket to another shard failed, drop
connection. {e:?}");
return Ok(HandlerResult::Finished);
}
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index 7f4f299e7..9ec66d9c9 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -315,28 +315,27 @@ impl IggyShard {
partition_id: usize,
fsync: bool,
) -> Result<(), IggyError> {
- let frozen_batches = self.streams.with_partition_by_id_mut(
+ let committed = self.streams.with_partition_by_id_mut(
stream_id,
topic_id,
partition_id,
- partitions::helpers::commit_journal_with_in_flight(),
+ partitions::helpers::commit_journal(),
);
- if frozen_batches.is_empty() {
+ if committed.frozen.is_empty() {
return Ok(());
}
self.streams
- .persist_frozen_messages_to_disk(
+ .persist_messages_to_disk(
stream_id,
topic_id,
partition_id,
- &frozen_batches,
+ committed,
&self.config.system,
)
.await?;
- // Ensure all data is flushed to disk before returning
if fsync {
self.streams
.fsync_all_messages(stream_id, topic_id, partition_id)
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 37e4192c4..803ab05fc 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -57,7 +57,7 @@ use crate::{
};
use ahash::AHashMap;
use err_trail::ErrContext;
-use iggy_common::{Identifier, IggyError, IggyMessagesBatch, IggyTimestamp,
PollingKind};
+use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
use slab::Slab;
use std::{
cell::RefCell,
@@ -216,23 +216,59 @@ impl MainOps for Streams {
let topic_id = ns.topic_id();
let partition_id = ns.partition_id();
- let current_offset = self.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- streaming_partitions::helpers::calculate_current_offset(),
- );
+ // Acquire the lock on the current active segment's writer.
+ // We must verify the segment hasn't rotated between reading state and
acquiring the lock.
+ // The writer must be stored outside the lock acquisition to keep it
alive.
+ let mut messages_writer;
+ let mut current_offset;
+ let mut current_position;
+ let mut segment_start_offset;
+ let mut message_deduplicator;
+
+ let _write_guard = loop {
+ (
+ messages_writer,
+ current_offset,
+ current_position,
+ segment_start_offset,
+ message_deduplicator,
+ ) = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(root, _, deduplicator, offset, _, _, log)| {
+ let current_offset = if !root.should_increment_offset() {
+ 0
+ } else {
+ offset.load(std::sync::atomic::Ordering::Relaxed) + 1
+ };
+ let segment = log.active_segment();
+ let writer = log
+ .active_storage()
+ .messages_writer
+ .clone()
+ .expect("Messages writer must exist for active
segment");
+ (
+ writer,
+ current_offset,
+ segment.current_position,
+ segment.start_offset,
+ deduplicator.clone(),
+ )
+ },
+ );
- let current_position =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- log.active_segment().current_position
- });
- let (segment_start_offset, message_deduplicator) =
self.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
-
streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(),
- );
+ let write_guard = messages_writer.lock.lock().await;
+
+ let current_segment_start =
+ self.with_partition_by_id(stream_id, topic_id, partition_id,
|(.., log)| {
+ log.active_segment().start_offset
+ });
+
+ if current_segment_start == segment_start_offset {
+ break write_guard;
+ }
+ };
input
.prepare_for_persistence(
@@ -250,6 +286,16 @@ impl MainOps for Streams {
streaming_partitions::helpers::append_to_journal(current_offset,
input),
)?;
+ let is_full = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ streaming_partitions::helpers::is_segment_full(),
+ );
+
+ // Release write lock before persistence I/O (persist_messages
acquires it again)
+ drop(_write_guard);
+
let unsaved_messages_count_exceeded =
journal_messages_count >=
config.partition.messages_required_to_save;
let unsaved_messages_size_exceeded = journal_size
@@ -258,13 +304,6 @@ impl MainOps for Streams {
.size_of_messages_required_to_save
.as_bytes_u64() as u32;
- let is_full = self.with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- streaming_partitions::helpers::is_segment_full(),
- );
-
// Try committing the journal
if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
let reason = self.with_partition_by_id(
@@ -1187,23 +1226,94 @@ impl Streams {
let numeric_topic_id =
self.with_topic_by_id(stream_id, topic_id,
topics::helpers::get_topic_id());
- if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
- || config.segment.cache_indexes == CacheIndexesConfig::None
- {
- self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
- log.clear_active_indexes();
+ let clear_indexes = config.segment.cache_indexes ==
CacheIndexesConfig::OpenSegment
+ || config.segment.cache_indexes == CacheIndexesConfig::None;
+
+ let segment_info =
+ self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
+ if log.active_segment().sealed {
+ return None;
+ }
+ let segment = log.active_segment();
+ Some((
+ segment.end_offset,
+ segment.start_offset,
+ segment.size,
+ log.active_storage().messages_writer.clone(),
+ ))
});
- }
- self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
- log.active_segment_mut().sealed = true;
- });
- let (log_writer, index_writer) =
+ let Some((end_offset, start_offset, size, writer_for_lock)) =
segment_info else {
+ return Ok(());
+ };
+
+ let Some(writer_for_lock) = writer_for_lock else {
+ return Ok(());
+ };
+
+ // CRITICAL: Create the new segment storage FIRST, before any
modifications.
+ // This ensures there's always a valid active segment with writers
available,
+ // preventing race conditions where commit_journal finds None writers.
+ let messages_size = 0;
+ let indexes_size = 0;
+ let new_segment = Segment::new(
+ end_offset + 1,
+ config.segment.size,
+ config.segment.message_expiry,
+ );
+
+ let new_storage = create_segment_storage(
+ config,
+ numeric_stream_id,
+ numeric_topic_id,
+ partition_id,
+ messages_size,
+ indexes_size,
+ end_offset + 1,
+ )
+ .await?;
+
+ let _write_guard = writer_for_lock.lock.lock().await;
+
+ // Atomically seal old segment, shutdown storage, and add new segment
+ let writers =
self.with_partition_by_id_mut(stream_id, topic_id, partition_id,
|(.., log)| {
+ if log.active_segment().sealed {
+ return None;
+ }
+ if log.active_segment().start_offset != start_offset {
+ return None;
+ }
+ if !log.active_segment().is_full() {
+ return None;
+ }
+
+ if clear_indexes {
+ log.clear_active_indexes();
+ }
+ log.active_segment_mut().sealed = true;
let (msg, index) = log.active_storage_mut().shutdown();
- (msg.unwrap(), index.unwrap())
+ log.add_persisted_segment(new_segment, new_storage);
+
+ Some((msg, index))
});
+ drop(_write_guard);
+
+ let Some((Some(log_writer), Some(index_writer))) = writers else {
+ return Ok(());
+ };
+
+ tracing::info!(
+ "Closed segment for stream: {}, topic: {} with start offset: {},
end offset: {}, size: {} for partition with ID: {}.",
+ stream_id,
+ topic_id,
+ start_offset,
+ end_offset,
+ size,
+ partition_id
+ );
+
registry
.oneshot("fsync:segment-close-log")
.critical(true)
@@ -1236,47 +1346,6 @@ impl Streams {
})
.spawn();
- let (start_offset, size, end_offset) =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- (
- log.active_segment().start_offset,
- log.active_segment().size,
- log.active_segment().end_offset,
- )
- });
-
- tracing::info!(
- "Closed segment for stream: {}, topic: {} with start offset: {},
end offset: {}, size: {} for partition with ID: {}.",
- stream_id,
- topic_id,
- start_offset,
- end_offset,
- size,
- partition_id
- );
-
- let messages_size = 0;
- let indexes_size = 0;
- let segment = Segment::new(
- end_offset + 1,
- config.segment.size,
- config.segment.message_expiry,
- );
-
- let storage = create_segment_storage(
- config,
- numeric_stream_id,
- numeric_topic_id,
- partition_id,
- messages_size,
- indexes_size,
- end_offset + 1,
- )
- .await?;
- self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
- log.add_persisted_segment(segment, storage);
- });
-
Ok(())
}
@@ -1295,7 +1364,7 @@ impl Streams {
return Ok(0);
}
- let batches = self.with_partition_by_id_mut(
+ let committed = self.with_partition_by_id_mut(
stream_id,
topic_id,
partition_id,
@@ -1311,7 +1380,7 @@ impl Streams {
);
let batch_count = self
- .persist_messages_to_disk(stream_id, topic_id, partition_id,
batches, config)
+ .persist_messages_to_disk(stream_id, topic_id, partition_id,
committed, config)
.await?;
Ok(batch_count)
@@ -1322,16 +1391,24 @@ impl Streams {
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: usize,
- mut batches: IggyMessagesBatchSet,
+ committed: streaming_partitions::helpers::CommittedBatch,
config: &SystemConfig,
) -> Result<u32, IggyError> {
- let batch_count = batches.count();
- let batch_size = batches.size();
-
+ let streaming_partitions::helpers::CommittedBatch {
+ segment_idx,
+ messages_writer,
+ index_writer,
+ unsaved_indexes,
+ frozen,
+ } = committed;
+
+ let batch_count = frozen.len() as u32;
if batch_count == 0 {
return Ok(0);
}
+ let batch_size: u64 = frozen.iter().map(|b| b.size() as u64).sum();
+
let has_segments =
self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
log.has_segments()
@@ -1341,27 +1418,6 @@ impl Streams {
return Ok(0);
}
- // Store frozen batches in in-flight buffer for reads during async I/O
- let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b|
b.freeze()).collect();
- self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
- log.set_in_flight(frozen.clone());
- });
-
- let (messages_writer, index_writer) =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- (
- log.active_storage()
- .messages_writer
- .as_ref()
- .expect("Messages writer not initialized")
- .clone(),
- log.active_storage()
- .index_writer
- .as_ref()
- .expect("Index writer not initialized")
- .clone(),
- )
- });
let guard = messages_writer.lock.lock().await;
let saved = messages_writer
@@ -1375,119 +1431,19 @@ impl Streams {
)
})?;
- // Extract unsaved indexes before async operation
- let unsaved_indexes_slice =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- log.active_indexes().unwrap().unsaved_slice()
- });
-
- let indexes_len = unsaved_indexes_slice.len();
- index_writer
- .as_ref()
- .save_indexes(unsaved_indexes_slice)
- .await
- .error(|e: &IggyError| {
- format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
- })?;
-
- tracing::trace!(
- "Persisted {} messages on disk for stream ID: {}, topic ID: {},
for partition with ID: {}, total bytes written: {}.",
- batch_count,
- stream_id,
- topic_id,
- partition_id,
- saved
- );
-
- self.with_partition_by_id_mut(
- stream_id,
- topic_id,
- partition_id,
-
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
- );
-
- self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
- log.clear_in_flight();
- });
-
- drop(guard);
- Ok(batch_count)
- }
-
- /// Persist frozen (immutable) batches to disk.
- /// Assumes in-flight buffer is already set by caller.
- /// Clears in-flight buffer after successful persist.
- pub async fn persist_frozen_messages_to_disk(
- &self,
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: usize,
- frozen_batches: &[IggyMessagesBatch],
- config: &SystemConfig,
- ) -> Result<u32, IggyError> {
- let (batch_count, batch_size): (u32, u32) = frozen_batches
- .iter()
- .fold((0u32, 0u32), |(count_acc, size_acc), b| {
- (count_acc + b.count(), size_acc + b.size())
- });
-
- if batch_count == 0 {
- return Ok(0);
- }
-
- let has_segments =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- log.has_segments()
- });
-
- if !has_segments {
- return Ok(0);
+ if !unsaved_indexes.is_empty() {
+ let indexes_len = unsaved_indexes.len();
+ index_writer
+ .as_ref()
+ .save_indexes(unsaved_indexes)
+ .await
+ .error(|e: &IggyError| {
+ format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+ })?;
}
- let (messages_writer, index_writer) =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- (
- log.active_storage()
- .messages_writer
- .as_ref()
- .expect("Messages writer not initialized")
- .clone(),
- log.active_storage()
- .index_writer
- .as_ref()
- .expect("Index writer not initialized")
- .clone(),
- )
- });
- let guard = messages_writer.lock.lock().await;
-
- let saved = messages_writer
- .as_ref()
- .save_frozen_batches(frozen_batches)
- .await
- .error(|e: &IggyError| {
- format!(
- "Failed to save batch of {batch_count} messages \
- ({batch_size} bytes) to stream ID: {stream_id}, topic ID:
{topic_id}, partition ID: {partition_id}. {e}",
- )
- })?;
-
- let unsaved_indexes_slice =
- self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
- log.active_indexes().unwrap().unsaved_slice()
- });
-
- let indexes_len = unsaved_indexes_slice.len();
- index_writer
- .as_ref()
- .save_indexes(unsaved_indexes_slice)
- .await
- .error(|e: &IggyError| {
- format!("Failed to save index of {indexes_len} indexes to
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
- })?;
-
tracing::trace!(
- "Persisted {} frozen messages on disk for stream ID: {}, topic ID:
{}, for partition with ID: {}, total bytes written: {}.",
+ "Persisted {} messages on disk for stream ID: {}, topic ID: {},
for partition with ID: {}, total bytes written: {}.",
batch_count,
stream_id,
topic_id,
@@ -1499,7 +1455,11 @@ impl Streams {
stream_id,
topic_id,
partition_id,
-
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
+ streaming_partitions::helpers::update_index_and_increment_stats(
+ segment_idx,
+ saved,
+ config,
+ ),
);
self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index c9c8e029c..745c95edd 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -32,11 +32,15 @@ use crate::{
storage,
},
polling_consumer::ConsumerGroupId,
- segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
storage::Storage},
+ segments::{
+ IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
IndexWriter,
+ MessagesWriter, storage::Storage,
+ },
},
};
use err_trail::ErrContext;
-use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use iggy_common::{ConsumerOffsetInfo, IggyByteSize, IggyError,
IggyMessagesBatch, PooledBuffer};
+use std::rc::Rc;
use std::{
ops::AsyncFnOnce,
sync::{Arc, atomic::Ordering},
@@ -430,35 +434,63 @@ pub fn append_to_journal(
}
}
-/// Commit journal and set in-flight buffer with frozen batches.
-/// Returns frozen batches for persisting to disk.
-pub fn commit_journal_with_in_flight()
--> impl FnOnce(ComponentsById<PartitionRefMut>) ->
Vec<iggy_common::IggyMessagesBatch> {
- |(.., log)| {
- let mut batches = log.journal_mut().commit();
- if batches.is_empty() {
- return Vec::new();
- }
- log.ensure_indexes();
- batches.append_indexes_to(log.active_indexes_mut().unwrap());
- let frozen: Vec<_> = batches.iter_mut().map(|b| b.freeze()).collect();
- log.set_in_flight(frozen.clone());
- frozen
- }
-}
-
pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) {
|(.., log)| {
log.clear_in_flight();
}
}
-pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) ->
IggyMessagesBatchSet {
+/// Result of committing journal batches for a specific segment.
+/// Captures writers, indexes, and frozen batches at commit time to prevent
race conditions.
+/// The in-flight buffer is set atomically during commit to ensure polls can
always find data.
+pub struct CommittedBatch {
+ pub segment_idx: usize,
+ pub messages_writer: Rc<MessagesWriter>,
+ pub index_writer: Rc<IndexWriter>,
+ /// Indexes captured at commit time, before any rotation can clear them.
+ pub unsaved_indexes: PooledBuffer,
+ /// Frozen (immutable) batches ready for disk persistence.
+ /// These are also stored in the in-flight buffer for reads during async
I/O.
+ pub frozen: Vec<IggyMessagesBatch>,
+}
+
+/// Commits the journal and returns the batches along with the segment's
writers and indexes.
+/// By capturing writers, indexes, and frozen batches at commit time (within
the same lock
+/// acquisition), we ensure that even if segment rotation happens after this
call, we still
+/// have valid data to complete the persistence operation.
+///
+/// CRITICAL: This function also sets the in-flight buffer atomically. This
prevents a race
+/// condition where a poll could arrive after journal commit but before
in-flight is set,
+/// finding both journal and in-flight empty while data is not yet on disk.
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) ->
CommittedBatch {
|(.., log)| {
- let batches = log.journal_mut().commit();
+ let segment_idx = log.segments().len().saturating_sub(1);
+ let mut batches = log.journal_mut().commit();
log.ensure_indexes();
batches.append_indexes_to(log.active_indexes_mut().unwrap());
- batches
+
+ let storage = log.active_storage();
+ let messages_writer = storage
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer must exist at commit time")
+ .clone();
+ let index_writer = storage
+ .index_writer
+ .as_ref()
+ .expect("Index writer must exist at commit time")
+ .clone();
+ let unsaved_indexes = log.active_indexes().unwrap().unsaved_slice();
+ let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b|
b.freeze()).collect();
+ log.set_in_flight(frozen.clone());
+
+ CommittedBatch {
+ segment_idx,
+ messages_writer,
+ index_writer,
+ unsaved_indexes,
+ frozen,
+ }
}
}
@@ -494,76 +526,22 @@ pub fn persist_reason(
}
}
-pub fn persist_batch(
- stream_id: &Identifier,
- topic_id: &Identifier,
- partition_id: usize,
- batches: IggyMessagesBatchSet,
- reason: String,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize,
u32), IggyError> {
- async move |(.., log)| {
- tracing::trace!(
- "Persisting messages on disk for stream ID: {}, topic ID: {},
partition ID: {} because {}...",
- stream_id,
- topic_id,
- partition_id,
- reason
- );
-
- let batch_count = batches.count();
- let batch_size = batches.size();
-
- let storage = log.active_storage();
- let saved = storage
- .messages_writer
- .as_ref()
- .expect("Messages writer not initialized")
- .save_batch_set(batches)
- .await
- .error(|e: &IggyError| {
- let segment = log.active_segment();
- format!(
- "Failed to save batch of {batch_count} messages \
- ({batch_size} bytes) to {segment}. {e}",
- )
- })?;
-
- let unsaved_indexes_slice =
log.active_indexes().unwrap().unsaved_slice();
- let len = unsaved_indexes_slice.len();
- storage
- .index_writer
- .as_ref()
- .expect("Index writer not initialized")
- .save_indexes(unsaved_indexes_slice)
- .await
- .error(|e: &IggyError| {
- let segment = log.active_segment();
- format!("Failed to save index of {len} indexes to {segment}.
{e}",)
- })?;
-
- tracing::trace!(
- "Persisted {} messages on disk for stream ID: {}, topic ID: {},
for partition with ID: {}, total bytes written: {}.",
- batch_count,
- stream_id,
- topic_id,
- partition_id,
- saved
- );
-
- Ok((saved, batch_count))
- }
-}
-
+/// Updates segment size and marks indexes as saved for a specific segment.
+/// Uses segment_idx to ensure we update the correct segment even after
rotation.
pub fn update_index_and_increment_stats(
+ segment_idx: usize,
saved: IggyByteSize,
config: &SystemConfig,
) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
move |(.., log)| {
- let segment = log.active_segment_mut();
- segment.size = IggyByteSize::from(segment.size.as_bytes_u64() +
saved.as_bytes_u64());
- log.active_indexes_mut().unwrap().mark_saved();
- if config.segment.cache_indexes == CacheIndexesConfig::None {
- log.active_indexes_mut().unwrap().clear();
+ if let Some(segment) = log.segments_mut().get_mut(segment_idx) {
+ segment.size = IggyByteSize::from(segment.size.as_bytes_u64() +
saved.as_bytes_u64());
+ }
+ if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) {
+ indexes.mark_saved();
+ if config.segment.cache_indexes == CacheIndexesConfig::None {
+ indexes.clear();
+ }
}
}
}
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 85d3bd215..3ef14c406 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,10 +16,7 @@
* under the License.
*/
-use crate::streaming::segments::{
- IggyMessagesBatchSet,
- messages::{write_batch, write_batch_frozen},
-};
+use crate::streaming::segments::messages::write_batch_frozen;
use compio::fs::{File, OpenOptions};
use err_trail::ErrContext;
use iggy_common::{IggyByteSize, IggyError, IggyMessagesBatch};
@@ -96,46 +93,7 @@ impl MessagesWriter {
})
}
- /// Append a batch of messages to the messages file.
- pub async fn save_batch_set(
- &self,
- batch_set: IggyMessagesBatchSet,
- ) -> Result<IggyByteSize, IggyError> {
- let messages_size = batch_set.size();
- let messages_count = batch_set.count();
- let containers_count = batch_set.containers_count();
- trace!(
- "Saving batch set of size {messages_size} bytes
({containers_count} containers, {messages_count} messages) to messages file:
{}",
- self.file_path
- );
- let position = self.messages_size_bytes.load(Ordering::Relaxed);
- let file = &self.file;
- write_batch(file, position, batch_set)
- .await
- .error(|e: &IggyError| {
- format!(
- "Failed to write batch to messages file: {}. {e}",
- self.file_path
- )
- })?;
-
- if self.fsync {
- let _ = self.fsync().await;
- }
-
- self.messages_size_bytes
- .fetch_add(messages_size as u64, Ordering::Release);
- trace!(
- "Written batch set of size {messages_size} bytes
({containers_count} containers, {messages_count} messages) to disk messages
file: {}",
- self.file_path
- );
-
- Ok(IggyByteSize::from(messages_size as u64))
- }
-
/// Append frozen (immutable) batches to the messages file.
- ///
- /// Unlike `save_batch_set`, this method does not take ownership of the
batches.
/// The caller retains the batches (for use in in-flight buffer) while
disk I/O proceeds.
pub async fn save_frozen_batches(
&self,
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index 8f5ba9c81..7289f8334 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,10 +19,10 @@
mod messages_reader;
mod messages_writer;
-use super::IggyMessagesBatchSet;
use bytes::Bytes;
use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer};
+use iggy_common::{IggyError, IggyMessagesBatch};
+use tracing::error;
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
@@ -32,30 +32,7 @@ pub use messages_writer::MessagesWriter;
/// cross-platform compatibility and leave room for any internal overhead.
const MAX_IOV_COUNT: usize = 1024;
-/// Vectored write a batches of messages to file
-async fn write_batch(
- file: &File,
- position: u64,
- mut batches: IggyMessagesBatchSet,
-) -> Result<usize, IggyError> {
- let (total_written, buffers) =
- batches
- .iter_mut()
- .fold((0usize, Vec::new()), |(size, mut bufs), batch| {
- let batch_size = batch.size() as usize;
- bufs.push(batch.take_messages());
- (size + batch_size, bufs)
- });
-
- write_vectored_chunked_pooled(file, position, buffers).await?;
- Ok(total_written)
-}
-
/// Vectored write frozen (immutable) batches to file.
-///
-/// This function writes `IggyMessagesBatch` (immutable, Arc-backed) directly
-/// without transferring ownership. The caller retains the batches for reads
-/// during the async I/O operation.
pub async fn write_batch_frozen(
file: &File,
position: u64,
@@ -73,26 +50,6 @@ pub async fn write_batch_frozen(
Ok(total_written)
}
-/// Writes PooledBuffer buffers to file using vectored I/O, chunking to
respect IOV_MAX limits.
-async fn write_vectored_chunked_pooled(
- file: &File,
- mut position: u64,
- buffers: Vec<PooledBuffer>,
-) -> Result<(), IggyError> {
- let mut iter = buffers.into_iter().peekable();
-
- while iter.peek().is_some() {
- let chunk: Vec<PooledBuffer> =
iter.by_ref().take(MAX_IOV_COUNT).collect();
- let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
-
- let (result, _) = (&*file).write_vectored_all_at(chunk,
position).await.into();
- result.map_err(|_| IggyError::CannotWriteToFile)?;
-
- position += chunk_size as u64;
- }
- Ok(())
-}
-
/// Writes Bytes buffers to file using vectored I/O, chunking to respect
IOV_MAX limits.
async fn write_vectored_chunked_bytes(
file: &File,
@@ -107,7 +64,10 @@ async fn write_vectored_chunked_bytes(
.write_vectored_all_at(chunk_vec, position)
.await
.into();
- result.map_err(|_| IggyError::CannotWriteToFile)?;
+ result.map_err(|e| {
+ error!("Failed to write frozen batch to messages file: {e}");
+ IggyError::CannotWriteToFile
+ })?;
position += chunk_size as u64;
}
diff --git a/core/server/src/streaming/segments/mod.rs
b/core/server/src/streaming/segments/mod.rs
index 7ed09c2d4..878af0eb1 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -24,6 +24,8 @@ mod types;
pub mod storage;
pub use indexes::IggyIndexesMut;
+pub use indexes::IndexWriter;
+pub use messages::MessagesWriter;
pub use segment::Segment;
pub use types::IggyMessageHeaderViewMut;
pub use types::IggyMessageViewMut;