This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix-segment-close-crash in repository https://gitbox.apache.org/repos/asf/iggy.git
commit a958d06e888ddb45f67429be9353d2b3533ab65d Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 16 12:39:35 2026 +0100 fix segment rotation crash --- core/integration/tests/server/scenarios/mod.rs | 1 + .../scenarios/segment_rotation_race_scenario.rs | 178 ++++++++++++++++++ core/integration/tests/server/specific.rs | 59 +++++- .../handlers/messages/send_messages_handler.rs | 3 +- core/server/src/shard/system/messages.rs | 4 +- core/server/src/slab/streams.rs | 209 ++++++++++++--------- core/server/src/streaming/partitions/helpers.rs | 71 ++++++- core/server/src/streaming/segments/mod.rs | 2 + 8 files changed, 429 insertions(+), 98 deletions(-) diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index 2c2fbab3a..e1a9cb398 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -31,6 +31,7 @@ pub mod encryption_scenario; pub mod message_headers_scenario; pub mod message_size_scenario; pub mod read_during_persistence_scenario; +pub mod segment_rotation_race_scenario; pub mod stale_client_consumer_group_scenario; pub mod stream_size_validation_scenario; pub mod system_scenario; diff --git a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs new file mode 100644 index 000000000..a321942d2 --- /dev/null +++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs @@ -0,0 +1,178 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Test scenario to reproduce issue #2572: panic on segment rotation race condition. +//! +//! The bug occurs when concurrent message sends race with segment rotation: +//! 1. Task A commits journal, ensures indexes for segment N, starts async save +//! 2. Task B's send triggers segment rotation (handle_full_segment) +//! 3. Task B clears segment N's indexes or creates segment N+1 with None indexes +//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None +//! +//! This test uses: +//! - Very small segment size (512B) to trigger frequent rotations +//! - 4 concurrent producers sending small messages to same partition +//! - Short message_saver interval to add more concurrent persist operations + +use iggy::prelude::*; +use integration::test_server::{ClientFactory, login_root}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; +use tokio::task::JoinSet; + +const STREAM_NAME: &str = "race-test-stream"; +const TOPIC_NAME: &str = "race-test-topic"; +const PRODUCER_COUNT: usize = 4; +// All producers write to the SAME partition to create lock contention +const PARTITION_ID: u32 = 0; +const TEST_DURATION_SECS: u64 = 20; +// Small batch size to increase frequency of send operations +const MESSAGES_PER_BATCH: usize = 5; + +pub async fn run(client_factory: &dyn ClientFactory) { + let admin_client = create_client(client_factory).await; + login_root(&admin_client).await; + + init_system(&admin_client).await; + + let stop_flag = Arc::new(AtomicBool::new(false)); + let total_messages = Arc::new(AtomicU64::new(0)); + let mut join_set = JoinSet::new(); + + for producer_id in 0..PRODUCER_COUNT { + let client = create_client(client_factory).await; + login_root(&client).await; + + let stop = stop_flag.clone(); + let counter = total_messages.clone(); + + // All producers write to PARTITION_ID (same partition) to create lock contention + join_set.spawn(async move { + run_producer(client, producer_id, PARTITION_ID, stop, counter).await; + }); + } + + tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await; + stop_flag.store(true, Ordering::SeqCst); + + while let Some(result) = join_set.join_next().await { + if let Err(e) = result + && e.is_panic() + { + let panic_info = e.into_panic(); + let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = panic_info.downcast_ref::<String>() { + s.clone() + } else { + "Unknown panic".to_string() + }; + panic!( + "Producer task panicked (likely issue #2572 reproduced): {}", + panic_msg + ); + } + } + + let sent = total_messages.load(Ordering::SeqCst); + println!("Test completed successfully. Total messages sent: {}", sent); + + cleanup(&admin_client).await; +} + +async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient { + let client = client_factory.create_client().await; + IggyClient::create(client, None, None) +} + +async fn init_system(client: &IggyClient) { + client.create_stream(STREAM_NAME).await.unwrap(); + + // Create topic with single partition - all producers will contend for this partition + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, // Single partition to force lock contention + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + println!( + "Created stream and topic with 1 partition, {} producers will contend for it", + PRODUCER_COUNT + ); +} + +async fn run_producer( + client: IggyClient, + producer_id: usize, + partition_id: u32, + stop: Arc<AtomicBool>, + counter: Arc<AtomicU64>, +) { + let mut batch_num = 0u64; + + while !stop.load(Ordering::SeqCst) { + let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH); + + for i in 0..MESSAGES_PER_BATCH { + let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i); + let message = IggyMessage::builder() + .payload(payload.into_bytes().into()) + .build() + .unwrap(); + messages.push(message); + } + + match client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + { + Ok(_) => { + counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst); + batch_num += 1; + } + Err(e) => { + eprintln!("Producer {} send error: {}", producer_id, e); + } + } + } + + println!( + "Producer {} (partition {}) stopped after {} batches", + producer_id, partition_id, batch_num + ); +} + +async fn cleanup(client: &IggyClient) { + client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 205d6b5df..28a955dfb 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -17,7 +17,8 @@ */ use crate::server::scenarios::{ - delete_segments_scenario, message_size_scenario, tcp_tls_scenario, websocket_tls_scenario, + delete_segments_scenario, message_size_scenario, segment_rotation_race_scenario, + tcp_tls_scenario, websocket_tls_scenario, }; use iggy::prelude::*; use integration::{ @@ -190,3 +191,59 @@ async fn message_size_scenario() { message_size_scenario::run(&client_factory).await; } + +/// Test to reproduce issue #2572: panic on segment rotation race condition. +/// +/// This test configures the server to trigger frequent segment rotations and runs +/// multiple concurrent producers to maximize the chance of hitting the race condition +/// between persist_messages_to_disk and handle_full_segment. +/// +/// Server configuration: +/// - Very small segment size (512B) to trigger frequent rotations +/// - Short message_saver interval (1s) to add concurrent persist operations +/// - Small messages_required_to_save (32) to trigger more frequent saves +/// - cache_indexes = none to trigger clear_active_indexes path +#[tokio::test] +#[parallel] +async fn segment_rotation_race_condition_issue_2572() { + let mut extra_envs = HashMap::new(); + + // Very small segment to trigger frequent rotations + extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "512B".to_string()); + + // Short message saver interval to add concurrent persist operations + extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), "1s".to_string()); + + // Small threshold to trigger more frequent saves + extra_envs.insert( + "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(), + "32".to_string(), + ); + + // cache_indexes = none triggers clear_active_indexes in handle_full_segment + extra_envs.insert( + "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(), + "none".to_string(), + ); + + // Disable socket migration to keep all connections on same shard + extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), "false".to_string()); + + // Enable TCP nodelay for faster message throughput + extra_envs.insert( + "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(), + "true".to_string(), + ); + extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), "true".to_string()); + + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + let server_addr = test_server.get_raw_tcp_addr().unwrap(); + let client_factory = TcpClientFactory { + server_addr, + ..Default::default() + }; + + segment_rotation_race_scenario::run(&client_factory).await; +} diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 6aa6786ac..2723e44d3 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -191,7 +191,8 @@ impl ServerCommandHandler for SendMessages { .send_request_to_shard_or_recoil(Some(&namespace), socket_transfer_msg) .await { - error!("tranfer socket to another shard failed, drop connection. {e:?}"); + // TODO: should we crash? + error!("transfer socket to another shard failed, drop connection. {e:?}"); return Ok(HandlerResult::Finished); } diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 02e8d9d0e..05453111a 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -315,7 +315,7 @@ impl IggyShard { partition_id: usize, fsync: bool, ) -> Result<(), IggyError> { - let batches = self.streams.with_partition_by_id_mut( + let committed = self.streams.with_partition_by_id_mut( stream_id, topic_id, partition_id, @@ -327,7 +327,7 @@ impl IggyShard { stream_id, topic_id, partition_id, - batches, + committed, &self.config.system, ) .await?; diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index f1c342de2..abe23168d 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -1187,23 +1187,104 @@ impl Streams { let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); - if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment - || config.segment.cache_indexes == CacheIndexesConfig::None - { - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.clear_active_indexes(); + let clear_indexes = config.segment.cache_indexes == CacheIndexesConfig::OpenSegment + || config.segment.cache_indexes == CacheIndexesConfig::None; + + // First, check if segment is already sealed and get info needed for new segment creation + let segment_info = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + // If segment is already sealed, another task is handling the closure + if log.active_segment().sealed { + return None; + } + let segment = log.active_segment(); + Some(( + segment.end_offset, + segment.start_offset, + segment.size, + log.active_storage().messages_writer.clone(), + )) }); - } - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.active_segment_mut().sealed = true; - }); - let (log_writer, index_writer) = + // If None (sealed), another task is handling closure + let Some((end_offset, start_offset, size, writer_for_lock)) = segment_info else { + return Ok(()); + }; + + // If writer is None, segment was already shutdown by another task + let Some(writer_for_lock) = writer_for_lock else { + return Ok(()); + }; + + // CRITICAL: Create the new segment storage FIRST, before any modifications. + // This ensures there's always a valid active segment with writers available, + // preventing race conditions where commit_journal finds None writers. + let messages_size = 0; + let indexes_size = 0; + let new_segment = Segment::new( + end_offset + 1, + config.segment.size, + config.segment.message_expiry, + ); + + let new_storage = create_segment_storage( + config, + numeric_stream_id, + numeric_topic_id, + partition_id, + messages_size, + indexes_size, + end_offset + 1, + ) + .await?; + + // Now acquire the write lock to ensure all pending writes complete + let _write_guard = writer_for_lock.lock.lock().await; + + // Atomically: seal old segment, shutdown storage, add new segment + // This ensures the new segment is available immediately when the old one is shutdown. + let writers = self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { + // Double-check sealed status (another task might have completed while we waited) + if log.active_segment().sealed { + return None; + } + + // Clear indexes if configured + if clear_indexes { + log.clear_active_indexes(); + } + + // Seal the old segment + log.active_segment_mut().sealed = true; + + // Extract writers from old segment let (msg, index) = log.active_storage_mut().shutdown(); - (msg.unwrap(), index.unwrap()) + + // Add the new segment - this makes it the new "active" segment immediately + log.add_persisted_segment(new_segment, new_storage); + + Some((msg, index)) }); + // Drop the write guard before spawning fsync tasks + drop(_write_guard); + + // If None, another task already handled segment closure + let Some((Some(log_writer), Some(index_writer))) = writers else { + return Ok(()); + }; + + tracing::info!( + "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", + stream_id, + topic_id, + start_offset, + end_offset, + size, + partition_id + ); + registry .oneshot("fsync:segment-close-log") .critical(true) @@ -1236,47 +1317,6 @@ impl Streams { }) .spawn(); - let (start_offset, size, end_offset) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_segment().start_offset, - log.active_segment().size, - log.active_segment().end_offset, - ) - }); - - tracing::info!( - "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", - stream_id, - topic_id, - start_offset, - end_offset, - size, - partition_id - ); - - let messages_size = 0; - let indexes_size = 0; - let segment = Segment::new( - end_offset + 1, - config.segment.size, - config.segment.message_expiry, - ); - - let storage = create_segment_storage( - config, - numeric_stream_id, - numeric_topic_id, - partition_id, - messages_size, - indexes_size, - end_offset + 1, - ) - .await?; - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.add_persisted_segment(segment, storage); - }); - Ok(()) } @@ -1295,7 +1335,9 @@ impl Streams { return Ok(0); } - let batches = self.with_partition_by_id_mut( + // commit_journal now returns CommittedBatch which includes writers captured at commit time. + // This prevents race conditions where segment rotation could invalidate writers. + let committed = self.with_partition_by_id_mut( stream_id, topic_id, partition_id, @@ -1311,7 +1353,7 @@ impl Streams { ); let batch_count = self - .persist_messages_to_disk(stream_id, topic_id, partition_id, batches, config) + .persist_messages_to_disk(stream_id, topic_id, partition_id, committed, config) .await?; Ok(batch_count) @@ -1322,9 +1364,17 @@ impl Streams { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - mut batches: IggyMessagesBatchSet, + committed: streaming_partitions::helpers::CommittedBatch, config: &SystemConfig, ) -> Result<u32, IggyError> { + let streaming_partitions::helpers::CommittedBatch { + mut batches, + segment_idx, + messages_writer, + index_writer, + unsaved_indexes, + } = committed; + let batch_count = batches.count(); let batch_size = batches.size(); @@ -1347,21 +1397,8 @@ impl Streams { log.set_in_flight(frozen.clone()); }); - let (messages_writer, index_writer) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_storage() - .messages_writer - .as_ref() - .expect("Messages writer not initialized") - .clone(), - log.active_storage() - .index_writer - .as_ref() - .expect("Index writer not initialized") - .clone(), - ) - }); + // Writers were captured at commit time, so they're guaranteed to be valid + // even if segment rotation happened after the commit. let guard = messages_writer.lock.lock().await; let saved = messages_writer @@ -1375,20 +1412,18 @@ impl Streams { ) })?; - // Extract unsaved indexes before async operation - let unsaved_indexes_slice = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_indexes().unwrap().unsaved_slice() - }); - - let indexes_len = unsaved_indexes_slice.len(); - index_writer - .as_ref() - .save_indexes(unsaved_indexes_slice) - .await - .error(|e: &IggyError| { - format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) - })?; + // Indexes were captured at commit time, so they're guaranteed to be valid + // even if segment rotation cleared the index buffer after the commit. + if !unsaved_indexes.is_empty() { + let indexes_len = unsaved_indexes.len(); + index_writer + .as_ref() + .save_indexes(unsaved_indexes) + .await + .error(|e: &IggyError| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) + })?; + } tracing::trace!( "Persisted {} messages on disk for stream ID: {}, topic ID: {}, for partition with ID: {}, total bytes written: {}.", @@ -1403,7 +1438,11 @@ impl Streams { stream_id, topic_id, partition_id, - streaming_partitions::helpers::update_index_and_increment_stats(saved, config), + streaming_partitions::helpers::update_index_and_increment_stats( + segment_idx, + saved, + config, + ), ); self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index 531e77e54..3d98b2e17 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -32,11 +32,15 @@ use crate::{ storage, }, polling_consumer::ConsumerGroupId, - segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, storage::Storage}, + segments::{ + IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, IndexWriter, + MessagesWriter, storage::Storage, + }, }, }; use err_trail::ErrContext; -use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError}; +use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError, PooledBuffer}; +use std::rc::Rc; use std::{ ops::AsyncFnOnce, sync::{Arc, atomic::Ordering}, @@ -430,12 +434,51 @@ pub fn append_to_journal( } } -pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSet { +/// Result of committing journal batches for a specific segment. +/// Captures writers and indexes at commit time to prevent race conditions with segment rotation. +pub struct CommittedBatch { + pub batches: IggyMessagesBatchSet, + pub segment_idx: usize, + pub messages_writer: Rc<MessagesWriter>, + pub index_writer: Rc<IndexWriter>, + /// Indexes captured at commit time, before any rotation can clear them. + pub unsaved_indexes: PooledBuffer, +} + +/// Commits the journal and returns the batches along with the segment's writers and indexes. +/// By capturing writers and indexes at commit time (within the same lock acquisition), we ensure +/// that even if segment rotation happens after this call, we still have valid data +/// to complete the persistence operation. +pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> CommittedBatch { |(.., log)| { + let segment_idx = log.segments().len().saturating_sub(1); let batches = log.journal_mut().commit(); log.ensure_indexes(); batches.append_indexes_to(log.active_indexes_mut().unwrap()); - batches + + // Capture writers NOW before any rotation can happen + let storage = log.active_storage(); + let messages_writer = storage + .messages_writer + .as_ref() + .expect("Messages writer must exist at commit time") + .clone(); + let index_writer = storage + .index_writer + .as_ref() + .expect("Index writer must exist at commit time") + .clone(); + + // Capture indexes NOW before any rotation can clear them + let unsaved_indexes = log.active_indexes().unwrap().unsaved_slice(); + + CommittedBatch { + batches, + segment_idx, + messages_writer, + index_writer, + unsaved_indexes, + } } } @@ -531,16 +574,26 @@ pub fn persist_batch( } } +/// Updates segment size and marks indexes as saved for a specific segment. +/// Uses segment_idx to ensure we update the correct segment even after rotation. +/// Gracefully handles the case where indexes were cleared during segment rotation. pub fn update_index_and_increment_stats( + segment_idx: usize, saved: IggyByteSize, config: &SystemConfig, ) -> impl FnOnce(ComponentsById<PartitionRefMut>) { move |(.., log)| { - let segment = log.active_segment_mut(); - segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); - log.active_indexes_mut().unwrap().mark_saved(); - if config.segment.cache_indexes == CacheIndexesConfig::None { - log.active_indexes_mut().unwrap().clear(); + // Update the specific segment we wrote to, not "active" which may have changed + if let Some(segment) = log.segments_mut().get_mut(segment_idx) { + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); + } + + // Handle indexes - may be None if segment was rotated and indexes cleared + if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) { + indexes.mark_saved(); + if config.segment.cache_indexes == CacheIndexesConfig::None { + indexes.clear(); + } } } } diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index 7ed09c2d4..878af0eb1 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -24,6 +24,8 @@ mod types; pub mod storage; pub use indexes::IggyIndexesMut; +pub use indexes::IndexWriter; +pub use messages::MessagesWriter; pub use segment::Segment; pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut;
