This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch fix-segment-close-crash in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 8cf603d6e6c615da8dad68e7275243ced1b9e703 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Jan 16 12:39:35 2026 +0100 fix segment rotation crash --- core/integration/tests/server/scenarios/mod.rs | 1 + .../scenarios/segment_rotation_race_scenario.rs | 201 +++++++++++++ core/integration/tests/server/specific.rs | 85 +++++- .../handlers/messages/send_messages_handler.rs | 3 +- core/server/src/slab/streams.rs | 323 ++++++++++++++------- core/server/src/streaming/partitions/helpers.rs | 71 ++++- core/server/src/streaming/segments/mod.rs | 2 + 7 files changed, 564 insertions(+), 122 deletions(-) diff --git a/core/integration/tests/server/scenarios/mod.rs b/core/integration/tests/server/scenarios/mod.rs index c714fb2a7..bc9bbfb3f 100644 --- a/core/integration/tests/server/scenarios/mod.rs +++ b/core/integration/tests/server/scenarios/mod.rs @@ -32,6 +32,7 @@ pub mod message_headers_scenario; pub mod message_size_scenario; pub mod permissions_scenario; pub mod read_during_persistence_scenario; +pub mod segment_rotation_race_scenario; pub mod stale_client_consumer_group_scenario; pub mod stream_size_validation_scenario; pub mod system_scenario; diff --git a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs new file mode 100644 index 000000000..5c0c895a0 --- /dev/null +++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs @@ -0,0 +1,201 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Test scenario to reproduce issue #2572: panic on segment rotation race condition. +//! +//! The bug occurs when concurrent message sends race with segment rotation: +//! 1. Task A commits journal, ensures indexes for segment N, starts async save +//! 2. Task B's send triggers segment rotation (handle_full_segment) +//! 3. Task B clears segment N's indexes or creates segment N+1 with None indexes +//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None +//! +//! This test uses: +//! - Very small segment size (512B) to trigger frequent rotations +//! - 8 concurrent producers (2 per protocol: TCP, HTTP, QUIC, WebSocket) +//! - All producers write to the same partition for maximum lock contention +//! - Short message_saver interval to add more concurrent persist operations + +use iggy::prelude::*; +use integration::test_server::{ClientFactory, login_root}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::time::Duration; +use tokio::task::JoinSet; + +const STREAM_NAME: &str = "race-test-stream"; +const TOPIC_NAME: &str = "race-test-topic"; +const PRODUCERS_PER_PROTOCOL: usize = 2; +const PARTITION_ID: u32 = 0; +const TEST_DURATION_SECS: u64 = 20; +const MESSAGES_PER_BATCH: usize = 5; + +/// Runs the segment rotation race condition test with multiple protocols. +/// Each client factory represents a different protocol (TCP, HTTP, QUIC, WebSocket). +/// 2 producers are spawned per protocol, all writing to the same partition. +pub async fn run(client_factories: &[&dyn ClientFactory]) { + assert!( + !client_factories.is_empty(), + "At least one client factory required" + ); + + let admin_client = create_client(client_factories[0]).await; + login_root(&admin_client).await; + + let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL; + init_system(&admin_client, total_producers).await; + + let stop_flag = Arc::new(AtomicBool::new(false)); + let total_messages = Arc::new(AtomicU64::new(0)); + let mut join_set = JoinSet::new(); + + let mut global_producer_id = 0usize; + for factory in client_factories { + let protocol = factory.transport(); + for local_id in 0..PRODUCERS_PER_PROTOCOL { + let client = create_client(*factory).await; + login_root(&client).await; + + let stop = stop_flag.clone(); + let counter = total_messages.clone(); + let producer_name = format!("{:?}-{}", protocol, local_id); + let producer_id = global_producer_id; + + join_set.spawn(async move { + run_producer( + client, + producer_id, + &producer_name, + PARTITION_ID, + stop, + counter, + ) + .await; + }); + + global_producer_id += 1; + } + } + + tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await; + stop_flag.store(true, Ordering::SeqCst); + + while let Some(result) = join_set.join_next().await { + if let Err(e) = result + && e.is_panic() + { + let panic_info = e.into_panic(); + let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = panic_info.downcast_ref::<String>() { + s.clone() + } else { + "Unknown panic".to_string() + }; + panic!( + "Producer task panicked (likely issue #2572 reproduced): {}", + panic_msg + ); + } + } + + let sent = total_messages.load(Ordering::SeqCst); + println!("Test completed successfully. Total messages sent: {}", sent); + + cleanup(&admin_client).await; +} + +async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient { + let client = client_factory.create_client().await; + IggyClient::create(client, None, None) +} + +async fn init_system(client: &IggyClient, total_producers: usize) { + client.create_stream(STREAM_NAME).await.unwrap(); + + client + .create_topic( + &Identifier::named(STREAM_NAME).unwrap(), + TOPIC_NAME, + 1, + CompressionAlgorithm::None, + None, + IggyExpiry::NeverExpire, + MaxTopicSize::ServerDefault, + ) + .await + .unwrap(); + + println!( + "Created stream and topic with 1 partition, {} producers will contend for it", + total_producers + ); +} + +async fn run_producer( + client: IggyClient, + producer_id: usize, + producer_name: &str, + partition_id: u32, + stop: Arc<AtomicBool>, + counter: Arc<AtomicU64>, +) { + let mut batch_num = 0u64; + + while !stop.load(Ordering::SeqCst) { + let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH); + + for i in 0..MESSAGES_PER_BATCH { + let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i); + let message = IggyMessage::builder() + .payload(payload.into_bytes().into()) + .build() + .unwrap(); + messages.push(message); + } + + match client + .send_messages( + &Identifier::named(STREAM_NAME).unwrap(), + &Identifier::named(TOPIC_NAME).unwrap(), + &Partitioning::partition_id(partition_id), + &mut messages, + ) + .await + { + Ok(_) => { + counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst); + batch_num += 1; + } + Err(e) => { + eprintln!("Producer {} send error: {}", producer_name, e); + } + } + } + + println!( + "Producer {} (partition {}) stopped after {} batches", + producer_name, partition_id, batch_num + ); +} + +async fn cleanup(client: &IggyClient) { + client + .delete_stream(&Identifier::named(STREAM_NAME).unwrap()) + .await + .unwrap(); +} diff --git a/core/integration/tests/server/specific.rs b/core/integration/tests/server/specific.rs index 205d6b5df..923b47b35 100644 --- a/core/integration/tests/server/specific.rs +++ b/core/integration/tests/server/specific.rs @@ -17,13 +17,17 @@ */ use crate::server::scenarios::{ - delete_segments_scenario, message_size_scenario, tcp_tls_scenario, websocket_tls_scenario, + delete_segments_scenario, message_size_scenario, segment_rotation_race_scenario, + tcp_tls_scenario, websocket_tls_scenario, }; use iggy::prelude::*; use integration::{ + http_client::HttpClientFactory, + quic_client::QuicClientFactory, tcp_client::TcpClientFactory, test_server::{IpAddrKind, TestServer}, test_tls_utils::generate_test_certificates, + websocket_client::WebSocketClientFactory, }; use serial_test::parallel; use std::collections::HashMap; @@ -190,3 +194,82 @@ async fn message_size_scenario() { message_size_scenario::run(&client_factory).await; } + +/// Test to reproduce issue #2572: panic on segment rotation race condition. +/// +/// This test configures the server to trigger frequent segment rotations and runs +/// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, WebSocket) +/// to maximize the chance of hitting the race condition between persist_messages_to_disk +/// and handle_full_segment. +/// +/// Server configuration: +/// - Very small segment size (512B) to trigger frequent rotations +/// - Short message_saver interval (1s) to add concurrent persist operations +/// - Small messages_required_to_save (32) to trigger more frequent saves +/// - cache_indexes = none to trigger clear_active_indexes path +/// +/// Test configuration: +/// - 8 producers total (2 per protocol: TCP, HTTP, QUIC, WebSocket) +/// - All producers write to the same partition for maximum lock contention +#[tokio::test] +#[parallel] +async fn segment_rotation_race_condition_issue_2572() { + let mut extra_envs = HashMap::new(); + + // Very small segment to trigger frequent rotations + extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), "512B".to_string()); + + // Short message saver interval to add concurrent persist operations + extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), "1s".to_string()); + + // Small threshold to trigger more frequent saves + extra_envs.insert( + "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(), + "32".to_string(), + ); + + // cache_indexes = none triggers clear_active_indexes in handle_full_segment + extra_envs.insert( + "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(), + "none".to_string(), + ); + + // Disable socket migration to keep all connections on same shard + extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), "false".to_string()); + + // Enable TCP nodelay for faster message throughput + extra_envs.insert( + "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(), + "true".to_string(), + ); + extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), "true".to_string()); + + let mut test_server = TestServer::new(Some(extra_envs), true, None, IpAddrKind::V4); + test_server.start(); + + let tcp_factory = TcpClientFactory { + server_addr: test_server.get_raw_tcp_addr().unwrap(), + ..Default::default() + }; + + let http_factory = HttpClientFactory { + server_addr: test_server.get_http_api_addr().unwrap(), + }; + + let quic_factory = QuicClientFactory { + server_addr: test_server.get_quic_udp_addr().unwrap(), + }; + + let websocket_factory = WebSocketClientFactory { + server_addr: test_server.get_websocket_addr().unwrap(), + }; + + let factories: Vec<&dyn integration::test_server::ClientFactory> = vec![ + &tcp_factory, + &http_factory, + &quic_factory, + &websocket_factory, + ]; + + segment_rotation_race_scenario::run(&factories).await; +} diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs b/core/server/src/binary/handlers/messages/send_messages_handler.rs index 2e2748776..dcaa6925f 100644 --- a/core/server/src/binary/handlers/messages/send_messages_handler.rs +++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs @@ -187,7 +187,8 @@ impl ServerCommandHandler for SendMessages { .send_request_to_shard_or_recoil(Some(&namespace), socket_transfer_msg) .await { - error!("tranfer socket to another shard failed, drop connection. {e:?}"); + // TODO: should we crash? + error!("transfer socket to another shard failed, drop connection. {e:?}"); return Ok(HandlerResult::Finished); } diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs index 37e4192c4..a850240e1 100644 --- a/core/server/src/slab/streams.rs +++ b/core/server/src/slab/streams.rs @@ -216,23 +216,64 @@ impl MainOps for Streams { let topic_id = ns.topic_id(); let partition_id = ns.partition_id(); - let current_offset = self.with_partition_by_id( - stream_id, - topic_id, - partition_id, - streaming_partitions::helpers::calculate_current_offset(), - ); + // Acquire the lock on the current active segment's writer. + // We must verify the segment hasn't rotated between reading state and acquiring the lock. + // The writer must be stored outside the lock acquisition to keep it alive. + let mut messages_writer; + let mut current_offset; + let mut current_position; + let mut segment_start_offset; + let mut message_deduplicator; + + let _write_guard = loop { + // Get messages_writer and all state atomically + ( + messages_writer, + current_offset, + current_position, + segment_start_offset, + message_deduplicator, + ) = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + |(root, _, deduplicator, offset, _, _, log)| { + let current_offset = if !root.should_increment_offset() { + 0 + } else { + offset.load(std::sync::atomic::Ordering::Relaxed) + 1 + }; + let segment = log.active_segment(); + let writer = log + .active_storage() + .messages_writer + .clone() + .expect("Messages writer must exist for active segment"); + ( + writer, + current_offset, + segment.current_position, + segment.start_offset, + deduplicator.clone(), + ) + }, + ); - let current_position = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_segment().current_position - }); - let (segment_start_offset, message_deduplicator) = self.with_partition_by_id( - stream_id, - topic_id, - partition_id, - streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(), - ); + // Acquire the lock + let write_guard = messages_writer.lock.lock().await; + + // Verify the segment hasn't changed while we were waiting + let current_segment_start = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + log.active_segment().start_offset + }); + + if current_segment_start == segment_start_offset { + // Segment is still the same, we can proceed + break write_guard; + } + // Segment rotated, release lock (guard dropped) and retry with fresh state + }; input .prepare_for_persistence( @@ -250,6 +291,16 @@ impl MainOps for Streams { streaming_partitions::helpers::append_to_journal(current_offset, input), )?; + let is_full = self.with_partition_by_id( + stream_id, + topic_id, + partition_id, + streaming_partitions::helpers::is_segment_full(), + ); + + // Release write lock before persistence I/O (persist_messages acquires it again) + drop(_write_guard); + let unsaved_messages_count_exceeded = journal_messages_count >= config.partition.messages_required_to_save; let unsaved_messages_size_exceeded = journal_size @@ -258,13 +309,6 @@ impl MainOps for Streams { .size_of_messages_required_to_save .as_bytes_u64() as u32; - let is_full = self.with_partition_by_id( - stream_id, - topic_id, - partition_id, - streaming_partitions::helpers::is_segment_full(), - ); - // Try committing the journal if is_full || unsaved_messages_count_exceeded || unsaved_messages_size_exceeded { let reason = self.with_partition_by_id( @@ -1187,23 +1231,118 @@ impl Streams { let numeric_topic_id = self.with_topic_by_id(stream_id, topic_id, topics::helpers::get_topic_id()); - if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment - || config.segment.cache_indexes == CacheIndexesConfig::None - { - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.clear_active_indexes(); + let clear_indexes = config.segment.cache_indexes == CacheIndexesConfig::OpenSegment + || config.segment.cache_indexes == CacheIndexesConfig::None; + + // First, check if segment is already sealed and get info needed for new segment creation + let segment_info = + self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { + // If segment is already sealed, another task is handling the closure + if log.active_segment().sealed { + return None; + } + let segment = log.active_segment(); + Some(( + segment.end_offset, + segment.start_offset, + segment.size, + log.active_storage().messages_writer.clone(), + )) }); - } - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.active_segment_mut().sealed = true; - }); - let (log_writer, index_writer) = + // If None (sealed), another task is handling closure + let Some((end_offset, start_offset, size, writer_for_lock)) = segment_info else { + return Ok(()); + }; + + // If writer is None, segment was already shutdown by another task + let Some(writer_for_lock) = writer_for_lock else { + return Ok(()); + }; + + // CRITICAL: Create the new segment storage FIRST, before any modifications. + // This ensures there's always a valid active segment with writers available, + // preventing race conditions where commit_journal finds None writers. + let messages_size = 0; + let indexes_size = 0; + let new_segment = Segment::new( + end_offset + 1, + config.segment.size, + config.segment.message_expiry, + ); + + let new_storage = create_segment_storage( + config, + numeric_stream_id, + numeric_topic_id, + partition_id, + messages_size, + indexes_size, + end_offset + 1, + ) + .await?; + + // Now acquire the write lock to ensure all pending writes complete + let _write_guard = writer_for_lock.lock.lock().await; + + // Atomically: seal old segment, shutdown storage, add new segment + // This ensures the new segment is available immediately when the old one is shutdown. + let writers = self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { + // Double-check sealed status (another task might have completed while we waited) + if log.active_segment().sealed { + return None; + } + + // Verify we're still operating on the same segment we read from. + // If another rotation happened, the active segment changed and our + // new_segment would have the wrong start_offset. + if log.active_segment().start_offset != start_offset { + return None; + } + + // Verify the segment is actually full. The is_full check in append_messages + // happens before releasing the lock, but by the time we get here, another + // task might have rotated the segment. We must re-verify. + if !log.active_segment().is_full() { + return None; + } + + // Clear indexes if configured + if clear_indexes { + log.clear_active_indexes(); + } + + // Seal the old segment + log.active_segment_mut().sealed = true; + + // Extract writers from old segment let (msg, index) = log.active_storage_mut().shutdown(); - (msg.unwrap(), index.unwrap()) + + // Add the new segment - this makes it the new "active" segment immediately + log.add_persisted_segment(new_segment, new_storage); + + Some((msg, index)) }); + // Drop the write guard before spawning fsync tasks + drop(_write_guard); + + // If None, another task already handled segment closure + let Some((Some(log_writer), Some(index_writer))) = writers else { + return Ok(()); + }; + + tracing::info!( + "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", + stream_id, + topic_id, + start_offset, + end_offset, + size, + partition_id + ); + registry .oneshot("fsync:segment-close-log") .critical(true) @@ -1236,47 +1375,6 @@ impl Streams { }) .spawn(); - let (start_offset, size, end_offset) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_segment().start_offset, - log.active_segment().size, - log.active_segment().end_offset, - ) - }); - - tracing::info!( - "Closed segment for stream: {}, topic: {} with start offset: {}, end offset: {}, size: {} for partition with ID: {}.", - stream_id, - topic_id, - start_offset, - end_offset, - size, - partition_id - ); - - let messages_size = 0; - let indexes_size = 0; - let segment = Segment::new( - end_offset + 1, - config.segment.size, - config.segment.message_expiry, - ); - - let storage = create_segment_storage( - config, - numeric_stream_id, - numeric_topic_id, - partition_id, - messages_size, - indexes_size, - end_offset + 1, - ) - .await?; - self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { - log.add_persisted_segment(segment, storage); - }); - Ok(()) } @@ -1295,7 +1393,9 @@ impl Streams { return Ok(0); } - let batches = self.with_partition_by_id_mut( + // commit_journal now returns CommittedBatch which includes writers captured at commit time. + // This prevents race conditions where segment rotation could invalidate writers. + let committed = self.with_partition_by_id_mut( stream_id, topic_id, partition_id, @@ -1311,7 +1411,7 @@ impl Streams { ); let batch_count = self - .persist_messages_to_disk(stream_id, topic_id, partition_id, batches, config) + .persist_messages_to_disk(stream_id, topic_id, partition_id, committed, config) .await?; Ok(batch_count) @@ -1322,9 +1422,17 @@ impl Streams { stream_id: &Identifier, topic_id: &Identifier, partition_id: usize, - mut batches: IggyMessagesBatchSet, + committed: streaming_partitions::helpers::CommittedBatch, config: &SystemConfig, ) -> Result<u32, IggyError> { + let streaming_partitions::helpers::CommittedBatch { + mut batches, + segment_idx, + messages_writer, + index_writer, + unsaved_indexes, + } = committed; + let batch_count = batches.count(); let batch_size = batches.size(); @@ -1347,21 +1455,8 @@ impl Streams { log.set_in_flight(frozen.clone()); }); - let (messages_writer, index_writer) = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - ( - log.active_storage() - .messages_writer - .as_ref() - .expect("Messages writer not initialized") - .clone(), - log.active_storage() - .index_writer - .as_ref() - .expect("Index writer not initialized") - .clone(), - ) - }); + // Writers were captured at commit time, so they're guaranteed to be valid + // even if segment rotation happened after the commit. let guard = messages_writer.lock.lock().await; let saved = messages_writer @@ -1375,20 +1470,18 @@ impl Streams { ) })?; - // Extract unsaved indexes before async operation - let unsaved_indexes_slice = - self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.active_indexes().unwrap().unsaved_slice() - }); - - let indexes_len = unsaved_indexes_slice.len(); - index_writer - .as_ref() - .save_indexes(unsaved_indexes_slice) - .await - .error(|e: &IggyError| { - format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) - })?; + // Indexes were captured at commit time, so they're guaranteed to be valid + // even if segment rotation cleared the index buffer after the commit. + if !unsaved_indexes.is_empty() { + let indexes_len = unsaved_indexes.len(); + index_writer + .as_ref() + .save_indexes(unsaved_indexes) + .await + .error(|e: &IggyError| { + format!("Failed to save index of {indexes_len} indexes to stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",) + })?; + } tracing::trace!( "Persisted {} messages on disk for stream ID: {}, topic ID: {}, for partition with ID: {}, total bytes written: {}.", @@ -1403,7 +1496,11 @@ impl Streams { stream_id, topic_id, partition_id, - streaming_partitions::helpers::update_index_and_increment_stats(saved, config), + streaming_partitions::helpers::update_index_and_increment_stats( + segment_idx, + saved, + config, + ), ); self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { @@ -1435,9 +1532,9 @@ impl Streams { return Ok(0); } - let has_segments = + let (has_segments, segment_idx) = self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., log)| { - log.has_segments() + (log.has_segments(), log.segments().len().saturating_sub(1)) }); if !has_segments { @@ -1499,7 +1596,11 @@ impl Streams { stream_id, topic_id, partition_id, - streaming_partitions::helpers::update_index_and_increment_stats(saved, config), + streaming_partitions::helpers::update_index_and_increment_stats( + segment_idx, + saved, + config, + ), ); self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., log)| { diff --git a/core/server/src/streaming/partitions/helpers.rs b/core/server/src/streaming/partitions/helpers.rs index c9c8e029c..5f64ff07f 100644 --- a/core/server/src/streaming/partitions/helpers.rs +++ b/core/server/src/streaming/partitions/helpers.rs @@ -32,11 +32,15 @@ use crate::{ storage, }, polling_consumer::ConsumerGroupId, - segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, storage::Storage}, + segments::{ + IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, IndexWriter, + MessagesWriter, storage::Storage, + }, }, }; use err_trail::ErrContext; -use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError}; +use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError, PooledBuffer}; +use std::rc::Rc; use std::{ ops::AsyncFnOnce, sync::{Arc, atomic::Ordering}, @@ -453,12 +457,51 @@ pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) { } } -pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> IggyMessagesBatchSet { +/// Result of committing journal batches for a specific segment. +/// Captures writers and indexes at commit time to prevent race conditions with segment rotation. +pub struct CommittedBatch { + pub batches: IggyMessagesBatchSet, + pub segment_idx: usize, + pub messages_writer: Rc<MessagesWriter>, + pub index_writer: Rc<IndexWriter>, + /// Indexes captured at commit time, before any rotation can clear them. + pub unsaved_indexes: PooledBuffer, +} + +/// Commits the journal and returns the batches along with the segment's writers and indexes. +/// By capturing writers and indexes at commit time (within the same lock acquisition), we ensure +/// that even if segment rotation happens after this call, we still have valid data +/// to complete the persistence operation. +pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> CommittedBatch { |(.., log)| { + let segment_idx = log.segments().len().saturating_sub(1); let batches = log.journal_mut().commit(); log.ensure_indexes(); batches.append_indexes_to(log.active_indexes_mut().unwrap()); - batches + + // Capture writers NOW before any rotation can happen + let storage = log.active_storage(); + let messages_writer = storage + .messages_writer + .as_ref() + .expect("Messages writer must exist at commit time") + .clone(); + let index_writer = storage + .index_writer + .as_ref() + .expect("Index writer must exist at commit time") + .clone(); + + // Capture indexes NOW before any rotation can clear them + let unsaved_indexes = log.active_indexes().unwrap().unsaved_slice(); + + CommittedBatch { + batches, + segment_idx, + messages_writer, + index_writer, + unsaved_indexes, + } } } @@ -554,16 +597,26 @@ pub fn persist_batch( } } +/// Updates segment size and marks indexes as saved for a specific segment. +/// Uses segment_idx to ensure we update the correct segment even after rotation. +/// Gracefully handles the case where indexes were cleared during segment rotation. pub fn update_index_and_increment_stats( + segment_idx: usize, saved: IggyByteSize, config: &SystemConfig, ) -> impl FnOnce(ComponentsById<PartitionRefMut>) { move |(.., log)| { - let segment = log.active_segment_mut(); - segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); - log.active_indexes_mut().unwrap().mark_saved(); - if config.segment.cache_indexes == CacheIndexesConfig::None { - log.active_indexes_mut().unwrap().clear(); + // Update the specific segment we wrote to, not "active" which may have changed + if let Some(segment) = log.segments_mut().get_mut(segment_idx) { + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved.as_bytes_u64()); + } + + // Handle indexes - may be None if segment was rotated and indexes cleared + if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) { + indexes.mark_saved(); + if config.segment.cache_indexes == CacheIndexesConfig::None { + indexes.clear(); + } } } } diff --git a/core/server/src/streaming/segments/mod.rs b/core/server/src/streaming/segments/mod.rs index 7ed09c2d4..878af0eb1 100644 --- a/core/server/src/streaming/segments/mod.rs +++ b/core/server/src/streaming/segments/mod.rs @@ -24,6 +24,8 @@ mod types; pub mod storage; pub use indexes::IggyIndexesMut; +pub use indexes::IndexWriter; +pub use messages::MessagesWriter; pub use segment::Segment; pub use types::IggyMessageHeaderViewMut; pub use types::IggyMessageViewMut;
