This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix-segment-close-crash in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f1af57b2f28f401e5adf473acdebe1aa6bac409e Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 16 12:39:35 2026 +0100 fix segment rotation crash --- core/integration/src/test_server.rs | 109 ++++++++++- core/integration/tests/server/mod.rs | 85 +++------ core/integration/tests/server/scenarios/mod.rs | 1 + .../scenarios/segment_rotation_race_scenario.rs | 178 ++++++++++++++++++ core/integration/tests/server/specific.rs | 59 +++++- .../handlers/messages/send_messages_handler.rs | 3 +- core/server/src/shard/system/messages.rs | 4 +- core/server/src/slab/streams.rs | 208 +++++++++++++-------- core/server/src/streaming/partitions/helpers.rs | 63 ++++++- core/server/src/streaming/segments/mod.rs | 2 + 10 files changed, 555 insertions(+), 157 deletions(-) diff --git a/core/integration/src/test_server.rs b/core/integration/src/test_server.rs index fd718178e..ac406452d 100644 --- a/core/integration/src/test_server.rs +++ b/core/integration/src/test_server.rs @@ -33,7 +33,9 @@ use std::io::Write; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr}; use std::path::{Path, PathBuf}; use std::process::{Child, Command, Stdio}; -use std::thread::{available_parallelism, panicking, sleep}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::thread::{self, JoinHandle, available_parallelism, panicking, sleep}; use std::time::Duration; use uuid::Uuid; @@ -75,7 +77,6 @@ enum ServerProtocolAddr { WebSocket(SocketAddr), } -#[derive(Debug)] pub struct TestServer { local_data_path: String, envs: HashMap<String, String>, @@ -85,6 +86,18 @@ pub struct TestServer { stderr_file_path: Option<PathBuf>, cleanup: bool, server_executable_path: Option<String>, + watchdog_handle: Option<JoinHandle<()>>, + watchdog_stop: Arc<AtomicBool>, +} + +impl std::fmt::Debug for TestServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestServer") + .field("local_data_path", &self.local_data_path) + .field("server_addrs", &self.server_addrs) + .field("cleanup", &self.cleanup) + .finish_non_exhaustive() + } } impl TestServer { @@ -194,6 +207,8 @@ impl TestServer { stderr_file_path: None, cleanup, server_executable_path, + watchdog_handle: None, + watchdog_stop: Arc::new(AtomicBool::new(false)), } } @@ -234,11 +249,101 @@ impl TestServer { } let child = command.spawn().unwrap(); + let pid = child.id(); self.child_handle = Some(child); self.wait_until_server_has_bound(); + + let watchdog_stop = self.watchdog_stop.clone(); + let stdout_path = self.stdout_file_path.clone(); + let stderr_path = self.stderr_file_path.clone(); + let watchdog_handle = thread::Builder::new() + .name("test-server-watchdog".to_string()) + .spawn(move || { + Self::watchdog_loop(pid, watchdog_stop, stdout_path, stderr_path); + }) + .expect("Failed to spawn watchdog thread"); + self.watchdog_handle = Some(watchdog_handle); + } + + /// Watchdog loop that monitors the server process. + /// Panics if the server exits while the watchdog is still running (i.e., not gracefully stopped). + fn watchdog_loop( + pid: u32, + stop_signal: Arc<AtomicBool>, + stdout_path: Option<PathBuf>, + stderr_path: Option<PathBuf>, + ) { + const CHECK_INTERVAL: Duration = Duration::from_millis(100); + + loop { + if stop_signal.load(Ordering::SeqCst) { + return; + } + + // Check if process is alive AND not a zombie via /proc/{pid}/stat + // A zombie process still exists in the process table (kill returns 0) + // but has state 'Z' in /proc/pid/stat + let process_alive = Self::is_process_alive(pid); + + if !process_alive { + // Server process has died unexpectedly! + // We print to stderr and abort because panic in a spawned thread + // doesn't propagate to the test thread. + let stdout_content = stdout_path + .as_ref() + .and_then(|p| fs::read_to_string(p).ok()) + .unwrap_or_else(|| "[No stdout log]".to_string()); + + let stderr_content = stderr_path + .as_ref() + .and_then(|p| fs::read_to_string(p).ok()) + .unwrap_or_else(|| "[No stderr log]".to_string()); + + eprintln!( + "\n\n=== SERVER CRASHED ===\n\ + The iggy-server process (PID {}) has died unexpectedly!\n\ + This usually indicates a bug in the server.\n\n\ + === STDOUT ===\n{}\n\n\ + === STDERR ===\n{}\n", + pid, stdout_content, stderr_content + ); + std::process::abort(); + } + + thread::sleep(CHECK_INTERVAL); + } + } + + /// Check if a process is alive (exists and is not a zombie). + fn is_process_alive(pid: u32) -> bool { + // Read /proc/{pid}/stat to get process state + let stat_path = format!("/proc/{}/stat", pid); + match fs::read_to_string(&stat_path) { + Ok(content) => { + // Format: pid (comm) state ... + // State is the third field after the command name in parentheses + // States: R=running, S=sleeping, D=disk sleep, Z=zombie, T=stopped, etc. + if let Some(state_start) = content.rfind(')') { + let after_comm = &content[state_start + 1..]; + let state = after_comm.trim().chars().next(); + // Process is dead if state is 'Z' (zombie) or 'X' (dead) + !matches!(state, Some('Z') | Some('X')) + } else { + false // Can't parse, assume dead + } + } + Err(_) => false, // Can't read /proc/{pid}/stat, process doesn't exist + } } pub fn stop(&mut self) { + // Signal the watchdog to stop FIRST, before we terminate the server. + // This prevents the watchdog from panicking when we gracefully stop. + self.watchdog_stop.store(true, Ordering::SeqCst); + if let Some(watchdog) = self.watchdog_handle.take() { + let _ = watchdog.join(); + } + #[allow(unused_mut)] if let Some(mut child_handle) = self.child_handle.take() { #[cfg(unix)] diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index 0b8fd502b..60a7031d7 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -22,7 +22,6 @@ mod general; mod scenarios; mod specific; -use futures::FutureExt; use iggy_common::TransportProtocol; use integration::{ http_client::HttpClientFactory, @@ -40,8 +39,6 @@ use scenarios::{ stream_size_validation_scenario, system_scenario, user_scenario, }; use std::pin::Pin; -use std::sync::{Arc, Mutex}; -use std::time::Duration; use std::{collections::HashMap, future::Future}; type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + '_>>; @@ -116,67 +113,31 @@ async fn run_scenario(transport: TransportProtocol, scenario: ScenarioFn) { "IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(), "15s".to_string(), ); - let test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); - let test_server = Arc::new(Mutex::new(test_server)); - - test_server.lock().unwrap().start(); - - let client_factory: Box<dyn ClientFactory> = { - let server = test_server.lock().unwrap(); - match transport { - TransportProtocol::Tcp => { - let server_addr = server.get_raw_tcp_addr().unwrap(); - Box::new(TcpClientFactory { - server_addr, - ..Default::default() - }) - } - TransportProtocol::Quic => { - let server_addr = server.get_quic_udp_addr().unwrap(); - Box::new(QuicClientFactory { server_addr }) - } - TransportProtocol::Http => { - let server_addr = server.get_http_api_addr().unwrap(); - Box::new(HttpClientFactory { server_addr }) - } - TransportProtocol::WebSocket => { - let server_addr = server.get_websocket_addr().unwrap(); - Box::new(WebSocketClientFactory { server_addr }) - } + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + // TestServer's watchdog thread will panic the test if the server crashes unexpectedly + let client_factory: Box<dyn ClientFactory> = match transport { + TransportProtocol::Tcp => { + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + Box::new(TcpClientFactory { + server_addr, + ..Default::default() + }) } - }; - - let monitor_server = test_server.clone(); - let crash_monitor = async move { - loop { - tokio::time::sleep(Duration::from_millis(100)).await; - let mut server = monitor_server.lock().unwrap(); - if !server.is_running() { - let (stdout, stderr) = server.collect_logs(); - return (stdout, stderr); - } + TransportProtocol::Quic => { + let server_addr = test_server.get_quic_udp_addr().unwrap(); + Box::new(QuicClientFactory { server_addr }) } - }; - - tokio::select! { - biased; - - (stdout, stderr) = crash_monitor => { - panic!( - "Server crashed during test!\n\n\ - === STDOUT ===\n{}\n\n\ - === STDERR ===\n{}", - stdout, stderr - ); + TransportProtocol::Http => { + let server_addr = test_server.get_http_api_addr().unwrap(); + Box::new(HttpClientFactory { server_addr }) } - - result = std::panic::AssertUnwindSafe(scenario(&*client_factory)).catch_unwind() => { - test_server.lock().unwrap().assert_running(); - - // Re-raise any panic from the scenario - if let Err(panic_payload) = result { - std::panic::resume_unwind(panic_payload); - } + TransportProtocol::WebSocket => { + let server_addr = test_server.get_websocket_addr().unwrap(); + Box::new(WebSocketClientFactory { server_addr }) } - } + }; + + scenario(&*client_factory).await; } diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 2c2fbab3a..e1a9cb398 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -31,6 +31,7 @@ pub mod encryption_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod read_during_persistence_scenario; +pub mod segment_rotation_race_scenario; pub mod stale_client_consumer_group_scenario; pub mod stream_size_validation_scenario; pub mod system_scenario; diff --git a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs new file mode 100644 index 000000000..a321942d2 --- /dev/null +++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs @@ -0,0 +1,178 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Test scenario to reproduce issue #2572: panic on segment rotation race condition. +//! +//! The bug occurs when concurrent message sends race with segment rotation: +//! 1. Task A commits journal, ensures indexes for segment N, starts async save +//! 2. Task B's send triggers segment rotation (handle_full_segment) +//! 3. Task B clears segment N's indexes or creates segment N+1 with None indexes +//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None +//! +//! This test uses: +//! - Very small segment size (512B) to trigger frequent rotations +//! - 4 concurrent producers sending small messages to same partition +//! - Short message_saver interval to add more concurrent persist operations + +use iggy::prelude::*; +use integration::test_server::{ClientFactory, login_root}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; +use tokio::task::JoinSet; + +const STREAM_NAME: &str = "race-test-stream"; +const TOPIC_NAME: &str = "race-test-topic"; +const PRODUCER_COUNT: usize = 4; +// All producers write to the SAME partition to create lock contention +const PARTITION_ID: u32 = 0; +const TEST_DURATION_SECS: u64 = 20; +// Small batch size to increase frequency of send operations +const MESSAGES_PER_BATCH: usize = 5; + +pub async fn run(client_factory: &dyn ClientFactory) { + let admin_client = create_client(client_factory).await; + login_root(&admin_client).await; + + init_system(&admin_client).await; + + let stop_flag = Arc::new(AtomicBool::new(false)); + let total_messages = Arc::new(AtomicU64::new(0)); + let mut join_set = JoinSet::new(); + + for producer_id in 0..PRODUCER_COUNT { + let client = create_client(client_factory).await; + login_root(&client).await; + + let stop = stop_flag.clone(); + let counter = total_messages.clone(); + + // All producers write to PARTITION_ID (same partition) to create lock contention + join_set.spawn(async move { + run_producer(client, producer_id, PARTITION_ID, stop, counter).await; + }); + } + + tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await; + stop_flag.store(true, Ordering::SeqCst); + + while let Some(result) = join_set.join_next().await { + if let Err(e) = result + && e.is_panic() + { + let panic_info = e.into_panic(); + let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = panic_info.downcast_ref::<String>() { + s.clone() + } else { + "Unknown panic".to_string() + }; + panic!( + "Producer task panicked (likely issue #2572 reproduced): {}", + panic_msg + ); + } + } + + let sent = total_messages.load(Ordering::SeqCst); + println!("Test completed successfully. Total messages sent: {}", sent); + + cleanup(&admin_client).await; +} + +async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient { + let client = client_factory.create_client().await; + IggyClient::create(client, None, None) +} + +async fn init_system(client: &IggyClient) { + client.create_stream(STREAM_NAME).await.unwrap(); + + // Create topic with single partition - all producers will contend for this partition + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, // Single partition to force lock contention + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + println!( + "Created stream and topic with 1 partition, {} producers will contend for it", + PRODUCER_COUNT + ); +} + +async fn run_producer( + client: IggyClient, + producer_id: usize, + partition_id: u32, + stop: Arc<AtomicBool>, + counter: Arc<AtomicU64>, +) { + let mut batch_num = 0u64; + + while !stop.load(Ordering::SeqCst) { + let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH); + + for i in 0..MESSAGES_PER_BATCH { + let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i); + let message = IggyMessage::builder() + .payload(payload.into_bytes().into()) + .build() + .unwrap(); + messages.push(message); + } + + match client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + { + Ok(_) => { + counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst); + batch_num += 1; + } + Err(e) => { + eprintln!("Producer {} send error: {}", producer_id, e); + } + } + } + + println!( + "Producer {} (partition {}) stopped after {} batches", + producer_id, partition_id, batch_num + ); +} + +async fn cleanup(client: &IggyClient) { + client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 205d6b5df..28a955dfb 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -17,7 +17,8 @@ */ use crate::server::scenarios::{ - delete_segments_scenario, message_size_scenario, tcp_tls_scenario, websocket_tls_scenario, + delete_segments_scenario, message_size_scenario, segment_rotation_race_scenario, + tcp_tls_scenario, websocket_tls_scenario, }; use iggy::prelude::*; use integration::{ @@ -190,3 +191,59 @@ async fn message_size_scenario() { message_size_scenario::run(&client_factory).await; } + +/// Test to reproduce issue #2572: panic on segment rotation race condition. +/// +/// This test configures the server to trigger frequent segment rotations and runs +/// multiple concurrent producers to maximize the chance of hitting the race condition +/// between persist_messages_to_disk and handle_full_segment. +/// +/// Server configuration: +/// - Very small segment size (512B) to trigger frequent rotations +/// - Short message_saver interval (1s) to add concurrent persist operations +/// - Small messages_required_to_save (32) to trigger more frequent saves +/// - cache_indexes = none to trigger clear_active_indexes path +#[tokio::test] +#[parallel] +async fn segment_rotation_race_condition_issue_2572() { + let mut extra_envs = HashMap::new(); + + // Very small segment to trigger frequent rotations + extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "512B".to_string()); + + // Short message saver interval to add concurrent persist operations + extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), "1s".to_string()); + + // Small threshold to trigger more frequent saves + extra_envs.insert( + "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(), + "32".to_string(), + ); + + // cache_indexes = none triggers clear_active_indexes in handle_full_segment + extra_envs.insert( + "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(), + "none".to_string(), + ); + + // Disable socket migration to keep all connections on same shard + extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), "false".to_string()); + + // Enable TCP nodelay for faster message throughput + extra_envs.insert( + "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(), + "true".to_string(), + ); + extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), "true".to_string()); + + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + let client_factory = TcpClientFactory { + server_addr, + ..Default::default() + }; + + segment_rotation_race_scenario::run(&client_factory).await; +} diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 6aa6786ac..2723e44d3 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -191,7 +191,8 @@ impl ServerCommandHandler for SendMessages { .send_request_to_shard_or_recoil(Some(&namespace), socket_transfer_msg) .await { - error!("tranfer socket to another shard failed, drop connection. {e:?}"); + // TODO: should we crash? + error!("transfer socket to another shard failed, drop connection. {e:?}"); return Ok(HandlerResult::Finished); } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 02e8d9d0e..05453111a 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -315,7 +315,7 @@ impl IggyShard { partition_id: usize, fsync: bool, ) -> Result<(), IggyError> { - let batches = self.streams.with_partition_by_id_mut( + let committed = self.streams.with_partition_by_id_mut( stream_id, topic_id, partition_id, @@ -327,7 +327,7 @@ impl IggyShard { stream_id, topic_id, partition_id, - batches, + committed, &self.config.system, ) .await?; diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index f1c342de2..494a5ff08 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1187,23 +1187,104 @@ impl Streams { let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); - if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment - || config.segment.cache_indexes == CacheIndexesConfig::None - { - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.clear_active_indexes(); + let clear_indexes = config.segment.cache_indexes == CacheIndexesConfig::OpenSegment + || config.segment.cache_indexes == CacheIndexesConfig::None; + + // First, check if segment is already sealed and get info needed for new segment creation + let segment_info = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + // If segment is already sealed, another task is handling the closure + if log.active_segment().sealed { + return None; + } + let segment = log.active_segment(); + Some(( + segment.end_offset, + segment.start_offset, + segment.size, + log.active_storage().messages_writer.clone(), + )) }); - } - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.active_segment_mut().sealed = true; - }); - let (log_writer, index_writer) = + // If None (sealed), another task is handling closure + let Some((end_offset, start_offset, size, writer_for_lock)) = segment_info else { + return Ok(()); + }; + + // If writer is None, segment was already shutdown by another task + let Some(writer_for_lock) = writer_for_lock else { + return Ok(()); + }; + + // CRITICAL: Create the new segment storage FIRST, before any modifications. + // This ensures there's always a valid active segment with writers available, + // preventing race conditions where commit_journal finds None writers. + let messages_size = 0; + let indexes_size = 0; + let new_segment = Segment::new( + end_offset + 1, + config.segment.size, + config.segment.message_expiry, + ); + + let new_storage = create_segment_storage( + config, + numeric_stream_id, + numeric_topic_id, + partition_id, + messages_size, + indexes_size, + end_offset + 1, + ) + .await?; + + // Now acquire the write lock to ensure all pending writes complete + let _write_guard = writer_for_lock.lock.lock().await; + + // Atomically: seal old segment, shutdown storage, add new segment + // This ensures the new segment is available immediately when the old one is shutdown. + let writers = self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { + // Double-check sealed status (another task might have completed while we waited) + if log.active_segment().sealed { + return None; + } + + // Clear indexes if configured + if clear_indexes { + log.clear_active_indexes(); + } + + // Seal the old segment + log.active_segment_mut().sealed = true; + + // Extract writers from old segment let (msg, index) = log.active_storage_mut().shutdown(); - (msg.unwrap(), index.unwrap()) + + // Add the new segment - this makes it the new "active" segment immediately + log.add_persisted_segment(new_segment, new_storage); + + Some((msg, index)) }); + // Drop the write guard before spawning fsync tasks + drop(_write_guard); + + // If None, another task already handled segment closure + let Some((Some(log_writer), Some(index_writer))) = writers else { + return Ok(()); + }; + + tracing::info!( + "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", + stream_id, + topic_id, + start_offset, + end_offset, + size, + partition_id + ); + registry .oneshot("fsync:segment-close-log") .critical(true) @@ -1236,47 +1317,6 @@ impl Streams { }) .spawn(); - let (start_offset, size, end_offset) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_segment().start_offset, - log.active_segment().size, - log.active_segment().end_offset, - ) - }); - - tracing::info!( - "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", - stream_id, - topic_id, - start_offset, - end_offset, - size, - partition_id - ); - - let messages_size = 0; - let indexes_size = 0; - let segment = Segment::new( - end_offset + 1, - config.segment.size, - config.segment.message_expiry, - ); - - let storage = create_segment_storage( - config, - numeric_stream_id, - numeric_topic_id, - partition_id, - messages_size, - indexes_size, - end_offset + 1, - ) - .await?; - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.add_persisted_segment(segment, storage); - }); - Ok(()) } @@ -1295,7 +1335,9 @@ impl Streams { return Ok(0); } - let batches = self.with_partition_by_id_mut( + // commit_journal now returns CommittedBatch which includes writers captured at commit time. + // This prevents race conditions where segment rotation could invalidate writers. + let committed = self.with_partition_by_id_mut( stream_id, topic_id, partition_id, @@ -1311,7 +1353,7 @@ impl Streams { ); let batch_count = self - .persist_messages_to_disk(stream_id, topic_id, partition_id, batches, config) + .persist_messages_to_disk(stream_id, topic_id, partition_id, committed, config) .await?; Ok(batch_count) @@ -1322,9 +1364,16 @@ impl Streams { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - mut batches: IggyMessagesBatchSet, + committed: streaming_partitions::helpers::CommittedBatch, config: &SystemConfig, ) -> Result<u32, IggyError> { + let streaming_partitions::helpers::CommittedBatch { + mut batches, + segment_idx, + messages_writer, + index_writer, + } = committed; + let batch_count = batches.count(); let batch_size = batches.size(); @@ -1347,21 +1396,8 @@ impl Streams { log.set_in_flight(frozen.clone()); }); - let (messages_writer, index_writer) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_storage() - .messages_writer - .as_ref() - .expect("Messages writer not initialized") - .clone(), - log.active_storage() - .index_writer - .as_ref() - .expect("Index writer not initialized") - .clone(), - ) - }); + // Writers were captured at commit time, so they're guaranteed to be valid + // even if segment rotation happened after the commit. let guard = messages_writer.lock.lock().await; let saved = messages_writer @@ -1375,20 +1411,26 @@ impl Streams { ) })?; - // Extract unsaved indexes before async operation + // Extract unsaved indexes for the specific segment (handles None gracefully if cleared during rotation) let unsaved_indexes_slice = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_indexes().unwrap().unsaved_slice() + log.indexes() + .get(segment_idx) + .and_then(|opt| opt.as_ref()) + .map(|indexes| indexes.unsaved_slice()) }); - let indexes_len = unsaved_indexes_slice.len(); - index_writer - .as_ref() - .save_indexes(unsaved_indexes_slice) - .await - .error(|e: &IggyError| { - format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) - })?; + // Only save indexes if they exist (may be None if segment was rotated and cleared) + if let Some(unsaved_indexes_slice) = unsaved_indexes_slice { + let indexes_len = unsaved_indexes_slice.len(); + index_writer + .as_ref() + .save_indexes(unsaved_indexes_slice) + .await + .error(|e: &IggyError| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) + })?; + } tracing::trace!( "Persisted {} messages on disk for stream ID: {}, topic ID: {}, for partition with ID: {}, total bytes written: {}.", @@ -1403,7 +1445,11 @@ impl Streams { stream_id, topic_id, partition_id, - streaming_partitions::helpers::update_index_and_increment_stats(saved, config), + streaming_partitions::helpers::update_index_and_increment_stats( + segment_idx, + saved, + config, + ), ); self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 531e77e54..e62a9e420 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -32,11 +32,15 @@ use crate::{ storage, }, polling_consumer::ConsumerGroupId, - segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, storage::Storage}, + segments::{ + IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, IndexWriter, + MessagesWriter, storage::Storage, + }, }, }; use err_trail::ErrContext; use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError}; +use std::rc::Rc; use std::{ ops::AsyncFnOnce, sync::{Arc, atomic::Ordering}, @@ -430,12 +434,45 @@ pub fn append_to_journal( } } -pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSet { +/// Result of committing journal batches for a specific segment. +/// Captures writers at commit time to prevent race conditions with segment rotation. +pub struct CommittedBatch { + pub batches: IggyMessagesBatchSet, + pub segment_idx: usize, + pub messages_writer: Rc<MessagesWriter>, + pub index_writer: Rc<IndexWriter>, +} + +/// Commits the journal and returns the batches along with the segment's writers. +/// By capturing writers at commit time (within the same lock acquisition), we ensure +/// that even if segment rotation happens after this call, we still have valid writers +/// to complete the persistence operation. +pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> CommittedBatch { |(.., log)| { + let segment_idx = log.segments().len().saturating_sub(1); let batches = log.journal_mut().commit(); log.ensure_indexes(); batches.append_indexes_to(log.active_indexes_mut().unwrap()); - batches + + // Capture writers NOW before any rotation can happen + let storage = log.active_storage(); + let messages_writer = storage + .messages_writer + .as_ref() + .expect("Messages writer must exist at commit time") + .clone(); + let index_writer = storage + .index_writer + .as_ref() + .expect("Index writer must exist at commit time") + .clone(); + + CommittedBatch { + batches, + segment_idx, + messages_writer, + index_writer, + } } } @@ -531,16 +568,26 @@ pub fn persist_batch( } } +/// Updates segment size and marks indexes as saved for a specific segment. +/// Uses segment_idx to ensure we update the correct segment even after rotation. +/// Gracefully handles the case where indexes were cleared during segment rotation. pub fn update_index_and_increment_stats( + segment_idx: usize, saved: IggyByteSize, config: &SystemConfig, ) -> impl FnOnce(ComponentsById<PartitionRefMut>) { move |(.., log)| { - let segment = log.active_segment_mut(); - segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); - log.active_indexes_mut().unwrap().mark_saved(); - if config.segment.cache_indexes == CacheIndexesConfig::None { - log.active_indexes_mut().unwrap().clear(); + // Update the specific segment we wrote to, not "active" which may have changed + if let Some(segment) = log.segments_mut().get_mut(segment_idx) { + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); + } + + // Handle indexes - may be None if segment was rotated and indexes cleared + if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) { + indexes.mark_saved(); + if config.segment.cache_indexes == CacheIndexesConfig::None { + indexes.clear(); + } } } } diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index 7ed09c2d4..878af0eb1 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -24,6 +24,8 @@ mod types; pub mod storage; pub use indexes::IggyIndexesMut; +pub use indexes::IndexWriter; +pub use messages::MessagesWriter; pub use segment::Segment; pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut;
