This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch fix-segment-close-crash
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f1af57b2f28f401e5adf473acdebe1aa6bac409e
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 12:39:35 2026 +0100

    fix segment rotation crash
---
 core/integration/src/test_server.rs                | 109 ++++++++++-
 core/integration/tests/server/mod.rs               |  85 +++------
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/segment_rotation_race_scenario.rs    | 178 ++++++++++++++++++
 core/integration/tests/server/specific.rs          |  59 +++++-
 .../handlers/messages/send_messages_handler.rs     |   3 +-
 core/server/src/shard/system/messages.rs           |   4 +-
 core/server/src/slab/streams.rs                    | 208 +++++++++++++--------
 core/server/src/streaming/partitions/helpers.rs    |  63 ++++++-
 core/server/src/streaming/segments/mod.rs          |   2 +
 10 files changed, 555 insertions(+), 157 deletions(-)

diff --git a/core/integration/src/test_server.rs 
b/core/integration/src/test_server.rs
index fd718178e..ac406452d 100644
--- a/core/integration/src/test_server.rs
+++ b/core/integration/src/test_server.rs
@@ -33,7 +33,9 @@ use std::io::Write;
 use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
 use std::path::{Path, PathBuf};
 use std::process::{Child, Command, Stdio};
-use std::thread::{available_parallelism, panicking, sleep};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::thread::{self, JoinHandle, available_parallelism, panicking, sleep};
 use std::time::Duration;
 use uuid::Uuid;
 
@@ -75,7 +77,6 @@ enum ServerProtocolAddr {
     WebSocket(SocketAddr),
 }
 
-#[derive(Debug)]
 pub struct TestServer {
     local_data_path: String,
     envs: HashMap<String, String>,
@@ -85,6 +86,18 @@ pub struct TestServer {
     stderr_file_path: Option<PathBuf>,
     cleanup: bool,
     server_executable_path: Option<String>,
+    watchdog_handle: Option<JoinHandle<()>>,
+    watchdog_stop: Arc<AtomicBool>,
+}
+
+impl std::fmt::Debug for TestServer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("TestServer")
+            .field("local_data_path", &self.local_data_path)
+            .field("server_addrs", &self.server_addrs)
+            .field("cleanup", &self.cleanup)
+            .finish_non_exhaustive()
+    }
 }
 
 impl TestServer {
@@ -194,6 +207,8 @@ impl TestServer {
             stderr_file_path: None,
             cleanup,
             server_executable_path,
+            watchdog_handle: None,
+            watchdog_stop: Arc::new(AtomicBool::new(false)),
         }
     }
 
@@ -234,11 +249,101 @@ impl TestServer {
         }
 
         let child = command.spawn().unwrap();
+        let pid = child.id();
         self.child_handle = Some(child);
         self.wait_until_server_has_bound();
+
+        let watchdog_stop = self.watchdog_stop.clone();
+        let stdout_path = self.stdout_file_path.clone();
+        let stderr_path = self.stderr_file_path.clone();
+        let watchdog_handle = thread::Builder::new()
+            .name("test-server-watchdog".to_string())
+            .spawn(move || {
+                Self::watchdog_loop(pid, watchdog_stop, stdout_path, 
stderr_path);
+            })
+            .expect("Failed to spawn watchdog thread");
+        self.watchdog_handle = Some(watchdog_handle);
+    }
+
+    /// Watchdog loop that monitors the server process.
+    /// Panics if the server exits while the watchdog is still running (i.e., 
not gracefully stopped).
+    fn watchdog_loop(
+        pid: u32,
+        stop_signal: Arc<AtomicBool>,
+        stdout_path: Option<PathBuf>,
+        stderr_path: Option<PathBuf>,
+    ) {
+        const CHECK_INTERVAL: Duration = Duration::from_millis(100);
+
+        loop {
+            if stop_signal.load(Ordering::SeqCst) {
+                return;
+            }
+
+            // Check if process is alive AND not a zombie via /proc/{pid}/stat
+            // A zombie process still exists in the process table (kill 
returns 0)
+            // but has state 'Z' in /proc/pid/stat
+            let process_alive = Self::is_process_alive(pid);
+
+            if !process_alive {
+                // Server process has died unexpectedly!
+                // We print to stderr and abort because panic in a spawned 
thread
+                // doesn't propagate to the test thread.
+                let stdout_content = stdout_path
+                    .as_ref()
+                    .and_then(|p| fs::read_to_string(p).ok())
+                    .unwrap_or_else(|| "[No stdout log]".to_string());
+
+                let stderr_content = stderr_path
+                    .as_ref()
+                    .and_then(|p| fs::read_to_string(p).ok())
+                    .unwrap_or_else(|| "[No stderr log]".to_string());
+
+                eprintln!(
+                    "\n\n=== SERVER CRASHED ===\n\
+                     The iggy-server process (PID {}) has died unexpectedly!\n\
+                     This usually indicates a bug in the server.\n\n\
+                     === STDOUT ===\n{}\n\n\
+                     === STDERR ===\n{}\n",
+                    pid, stdout_content, stderr_content
+                );
+                std::process::abort();
+            }
+
+            thread::sleep(CHECK_INTERVAL);
+        }
+    }
+
+    /// Check if a process is alive (exists and is not a zombie).
+    fn is_process_alive(pid: u32) -> bool {
+        // Read /proc/{pid}/stat to get process state
+        let stat_path = format!("/proc/{}/stat", pid);
+        match fs::read_to_string(&stat_path) {
+            Ok(content) => {
+                // Format: pid (comm) state ...
+                // State is the third field after the command name in 
parentheses
+                // States: R=running, S=sleeping, D=disk sleep, Z=zombie, 
T=stopped, etc.
+                if let Some(state_start) = content.rfind(')') {
+                    let after_comm = &content[state_start + 1..];
+                    let state = after_comm.trim().chars().next();
+                    // Process is dead if state is 'Z' (zombie) or 'X' (dead)
+                    !matches!(state, Some('Z') | Some('X'))
+                } else {
+                    false // Can't parse, assume dead
+                }
+            }
+            Err(_) => false, // Can't read /proc/{pid}/stat, process doesn't 
exist
+        }
     }
 
     pub fn stop(&mut self) {
+        // Signal the watchdog to stop FIRST, before we terminate the server.
+        // This prevents the watchdog from panicking when we gracefully stop.
+        self.watchdog_stop.store(true, Ordering::SeqCst);
+        if let Some(watchdog) = self.watchdog_handle.take() {
+            let _ = watchdog.join();
+        }
+
         #[allow(unused_mut)]
         if let Some(mut child_handle) = self.child_handle.take() {
             #[cfg(unix)]
diff --git a/core/integration/tests/server/mod.rs 
b/core/integration/tests/server/mod.rs
index 0b8fd502b..60a7031d7 100644
--- a/core/integration/tests/server/mod.rs
+++ b/core/integration/tests/server/mod.rs
@@ -22,7 +22,6 @@ mod general;
 mod scenarios;
 mod specific;
 
-use futures::FutureExt;
 use iggy_common::TransportProtocol;
 use integration::{
     http_client::HttpClientFactory,
@@ -40,8 +39,6 @@ use scenarios::{
     stream_size_validation_scenario, system_scenario, user_scenario,
 };
 use std::pin::Pin;
-use std::sync::{Arc, Mutex};
-use std::time::Duration;
 use std::{collections::HashMap, future::Future};
 
 type ScenarioFn = fn(&dyn ClientFactory) -> Pin<Box<dyn Future<Output = ()> + 
'_>>;
@@ -116,67 +113,31 @@ async fn run_scenario(transport: TransportProtocol, 
scenario: ScenarioFn) {
         "IGGY_QUIC_KEEP_ALIVE_INTERVAL".to_string(),
         "15s".to_string(),
     );
-    let test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
-    let test_server = Arc::new(Mutex::new(test_server));
-
-    test_server.lock().unwrap().start();
-
-    let client_factory: Box<dyn ClientFactory> = {
-        let server = test_server.lock().unwrap();
-        match transport {
-            TransportProtocol::Tcp => {
-                let server_addr = server.get_raw_tcp_addr().unwrap();
-                Box::new(TcpClientFactory {
-                    server_addr,
-                    ..Default::default()
-                })
-            }
-            TransportProtocol::Quic => {
-                let server_addr = server.get_quic_udp_addr().unwrap();
-                Box::new(QuicClientFactory { server_addr })
-            }
-            TransportProtocol::Http => {
-                let server_addr = server.get_http_api_addr().unwrap();
-                Box::new(HttpClientFactory { server_addr })
-            }
-            TransportProtocol::WebSocket => {
-                let server_addr = server.get_websocket_addr().unwrap();
-                Box::new(WebSocketClientFactory { server_addr })
-            }
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    // TestServer's watchdog thread will panic the test if the server crashes 
unexpectedly
+    let client_factory: Box<dyn ClientFactory> = match transport {
+        TransportProtocol::Tcp => {
+            let server_addr = test_server.get_raw_tcp_addr().unwrap();
+            Box::new(TcpClientFactory {
+                server_addr,
+                ..Default::default()
+            })
         }
-    };
-
-    let monitor_server = test_server.clone();
-    let crash_monitor = async move {
-        loop {
-            tokio::time::sleep(Duration::from_millis(100)).await;
-            let mut server = monitor_server.lock().unwrap();
-            if !server.is_running() {
-                let (stdout, stderr) = server.collect_logs();
-                return (stdout, stderr);
-            }
+        TransportProtocol::Quic => {
+            let server_addr = test_server.get_quic_udp_addr().unwrap();
+            Box::new(QuicClientFactory { server_addr })
         }
-    };
-
-    tokio::select! {
-        biased;
-
-        (stdout, stderr) = crash_monitor => {
-            panic!(
-                "Server crashed during test!\n\n\
-                 === STDOUT ===\n{}\n\n\
-                 === STDERR ===\n{}",
-                stdout, stderr
-            );
+        TransportProtocol::Http => {
+            let server_addr = test_server.get_http_api_addr().unwrap();
+            Box::new(HttpClientFactory { server_addr })
         }
-
-        result = 
std::panic::AssertUnwindSafe(scenario(&*client_factory)).catch_unwind() => {
-            test_server.lock().unwrap().assert_running();
-
-            // Re-raise any panic from the scenario
-            if let Err(panic_payload) = result {
-                std::panic::resume_unwind(panic_payload);
-            }
+        TransportProtocol::WebSocket => {
+            let server_addr = test_server.get_websocket_addr().unwrap();
+            Box::new(WebSocketClientFactory { server_addr })
         }
-    }
+    };
+
+    scenario(&*client_factory).await;
 }
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 2c2fbab3a..e1a9cb398 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -31,6 +31,7 @@ pub mod encryption_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod read_during_persistence_scenario;
+pub mod segment_rotation_race_scenario;
 pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
diff --git 
a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs 
b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
new file mode 100644
index 000000000..a321942d2
--- /dev/null
+++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
@@ -0,0 +1,178 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Test scenario to reproduce issue #2572: panic on segment rotation race 
condition.
+//!
+//! The bug occurs when concurrent message sends race with segment rotation:
+//! 1. Task A commits journal, ensures indexes for segment N, starts async save
+//! 2. Task B's send triggers segment rotation (handle_full_segment)
+//! 3. Task B clears segment N's indexes or creates segment N+1 with None 
indexes
+//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None
+//!
+//! This test uses:
+//! - Very small segment size (512B) to trigger frequent rotations
+//! - 4 concurrent producers sending small messages to same partition
+//! - Short message_saver interval to add more concurrent persist operations
+
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::Duration;
+use tokio::task::JoinSet;
+
+const STREAM_NAME: &str = "race-test-stream";
+const TOPIC_NAME: &str = "race-test-topic";
+const PRODUCER_COUNT: usize = 4;
+// All producers write to the SAME partition to create lock contention
+const PARTITION_ID: u32 = 0;
+const TEST_DURATION_SECS: u64 = 20;
+// Small batch size to increase frequency of send operations
+const MESSAGES_PER_BATCH: usize = 5;
+
+pub async fn run(client_factory: &dyn ClientFactory) {
+    let admin_client = create_client(client_factory).await;
+    login_root(&admin_client).await;
+
+    init_system(&admin_client).await;
+
+    let stop_flag = Arc::new(AtomicBool::new(false));
+    let total_messages = Arc::new(AtomicU64::new(0));
+    let mut join_set = JoinSet::new();
+
+    for producer_id in 0..PRODUCER_COUNT {
+        let client = create_client(client_factory).await;
+        login_root(&client).await;
+
+        let stop = stop_flag.clone();
+        let counter = total_messages.clone();
+
+        // All producers write to PARTITION_ID (same partition) to create lock 
contention
+        join_set.spawn(async move {
+            run_producer(client, producer_id, PARTITION_ID, stop, 
counter).await;
+        });
+    }
+
+    tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await;
+    stop_flag.store(true, Ordering::SeqCst);
+
+    while let Some(result) = join_set.join_next().await {
+        if let Err(e) = result
+            && e.is_panic()
+        {
+            let panic_info = e.into_panic();
+            let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() 
{
+                s.to_string()
+            } else if let Some(s) = panic_info.downcast_ref::<String>() {
+                s.clone()
+            } else {
+                "Unknown panic".to_string()
+            };
+            panic!(
+                "Producer task panicked (likely issue #2572 reproduced): {}",
+                panic_msg
+            );
+        }
+    }
+
+    let sent = total_messages.load(Ordering::SeqCst);
+    println!("Test completed successfully. Total messages sent: {}", sent);
+
+    cleanup(&admin_client).await;
+}
+
+async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient {
+    let client = client_factory.create_client().await;
+    IggyClient::create(client, None, None)
+}
+
+async fn init_system(client: &IggyClient) {
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    // Create topic with single partition - all producers will contend for 
this partition
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1, // Single partition to force lock contention
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    println!(
+        "Created stream and topic with 1 partition, {} producers will contend 
for it",
+        PRODUCER_COUNT
+    );
+}
+
+async fn run_producer(
+    client: IggyClient,
+    producer_id: usize,
+    partition_id: u32,
+    stop: Arc<AtomicBool>,
+    counter: Arc<AtomicU64>,
+) {
+    let mut batch_num = 0u64;
+
+    while !stop.load(Ordering::SeqCst) {
+        let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH);
+
+        for i in 0..MESSAGES_PER_BATCH {
+            let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i);
+            let message = IggyMessage::builder()
+                .payload(payload.into_bytes().into())
+                .build()
+                .unwrap();
+            messages.push(message);
+        }
+
+        match client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+        {
+            Ok(_) => {
+                counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst);
+                batch_num += 1;
+            }
+            Err(e) => {
+                eprintln!("Producer {} send error: {}", producer_id, e);
+            }
+        }
+    }
+
+    println!(
+        "Producer {} (partition {}) stopped after {} batches",
+        producer_id, partition_id, batch_num
+    );
+}
+
+async fn cleanup(client: &IggyClient) {
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 205d6b5df..28a955dfb 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,7 +17,8 @@
  */
 
 use crate::server::scenarios::{
-    delete_segments_scenario, message_size_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
+    delete_segments_scenario, message_size_scenario, 
segment_rotation_race_scenario,
+    tcp_tls_scenario, websocket_tls_scenario,
 };
 use iggy::prelude::*;
 use integration::{
@@ -190,3 +191,59 @@ async fn message_size_scenario() {
 
     message_size_scenario::run(&client_factory).await;
 }
+
+/// Test to reproduce issue #2572: panic on segment rotation race condition.
+///
+/// This test configures the server to trigger frequent segment rotations and 
runs
+/// multiple concurrent producers to maximize the chance of hitting the race 
condition
+/// between persist_messages_to_disk and handle_full_segment.
+///
+/// Server configuration:
+/// - Very small segment size (512B) to trigger frequent rotations
+/// - Short message_saver interval (1s) to add concurrent persist operations
+/// - Small messages_required_to_save (32) to trigger more frequent saves
+/// - cache_indexes = none to trigger clear_active_indexes path
+#[tokio::test]
+#[parallel]
+async fn segment_rotation_race_condition_issue_2572() {
+    let mut extra_envs = HashMap::new();
+
+    // Very small segment to trigger frequent rotations
+    extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"512B".to_string());
+
+    // Short message saver interval to add concurrent persist operations
+    extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), 
"1s".to_string());
+
+    // Small threshold to trigger more frequent saves
+    extra_envs.insert(
+        "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+        "32".to_string(),
+    );
+
+    // cache_indexes = none triggers clear_active_indexes in 
handle_full_segment
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+        "none".to_string(),
+    );
+
+    // Disable socket migration to keep all connections on same shard
+    extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), 
"false".to_string());
+
+    // Enable TCP nodelay for faster message throughput
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    segment_rotation_race_scenario::run(&client_factory).await;
+}
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 6aa6786ac..2723e44d3 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -191,7 +191,8 @@ impl ServerCommandHandler for SendMessages {
                     .send_request_to_shard_or_recoil(Some(&namespace), 
socket_transfer_msg)
                     .await
                 {
-                    error!("tranfer socket to another shard failed, drop 
connection. {e:?}");
+                    // TODO: should we crash?
+                    error!("transfer socket to another shard failed, drop 
connection. {e:?}");
                     return Ok(HandlerResult::Finished);
                 }
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 02e8d9d0e..05453111a 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -315,7 +315,7 @@ impl IggyShard {
         partition_id: usize,
         fsync: bool,
     ) -> Result<(), IggyError> {
-        let batches = self.streams.with_partition_by_id_mut(
+        let committed = self.streams.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
@@ -327,7 +327,7 @@ impl IggyShard {
                 stream_id,
                 topic_id,
                 partition_id,
-                batches,
+                committed,
                 &self.config.system,
             )
             .await?;
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index f1c342de2..494a5ff08 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -1187,23 +1187,104 @@ impl Streams {
         let numeric_topic_id =
             self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
-        if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
-            || config.segment.cache_indexes == CacheIndexesConfig::None
-        {
-            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
-                log.clear_active_indexes();
+        let clear_indexes = config.segment.cache_indexes == 
CacheIndexesConfig::OpenSegment
+            || config.segment.cache_indexes == CacheIndexesConfig::None;
+
+        // First, check if segment is already sealed and get info needed for 
new segment creation
+        let segment_info =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                // If segment is already sealed, another task is handling the 
closure
+                if log.active_segment().sealed {
+                    return None;
+                }
+                let segment = log.active_segment();
+                Some((
+                    segment.end_offset,
+                    segment.start_offset,
+                    segment.size,
+                    log.active_storage().messages_writer.clone(),
+                ))
             });
-        }
 
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.active_segment_mut().sealed = true;
-        });
-        let (log_writer, index_writer) =
+        // If None (sealed), another task is handling closure
+        let Some((end_offset, start_offset, size, writer_for_lock)) = 
segment_info else {
+            return Ok(());
+        };
+
+        // If writer is None, segment was already shutdown by another task
+        let Some(writer_for_lock) = writer_for_lock else {
+            return Ok(());
+        };
+
+        // CRITICAL: Create the new segment storage FIRST, before any 
modifications.
+        // This ensures there's always a valid active segment with writers 
available,
+        // preventing race conditions where commit_journal finds None writers.
+        let messages_size = 0;
+        let indexes_size = 0;
+        let new_segment = Segment::new(
+            end_offset + 1,
+            config.segment.size,
+            config.segment.message_expiry,
+        );
+
+        let new_storage = create_segment_storage(
+            config,
+            numeric_stream_id,
+            numeric_topic_id,
+            partition_id,
+            messages_size,
+            indexes_size,
+            end_offset + 1,
+        )
+        .await?;
+
+        // Now acquire the write lock to ensure all pending writes complete
+        let _write_guard = writer_for_lock.lock.lock().await;
+
+        // Atomically: seal old segment, shutdown storage, add new segment
+        // This ensures the new segment is available immediately when the old 
one is shutdown.
+        let writers =
             self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                // Double-check sealed status (another task might have 
completed while we waited)
+                if log.active_segment().sealed {
+                    return None;
+                }
+
+                // Clear indexes if configured
+                if clear_indexes {
+                    log.clear_active_indexes();
+                }
+
+                // Seal the old segment
+                log.active_segment_mut().sealed = true;
+
+                // Extract writers from old segment
                 let (msg, index) = log.active_storage_mut().shutdown();
-                (msg.unwrap(), index.unwrap())
+
+                // Add the new segment - this makes it the new "active" 
segment immediately
+                log.add_persisted_segment(new_segment, new_storage);
+
+                Some((msg, index))
             });
 
+        // Drop the write guard before spawning fsync tasks
+        drop(_write_guard);
+
+        // If None, another task already handled segment closure
+        let Some((Some(log_writer), Some(index_writer))) = writers else {
+            return Ok(());
+        };
+
+        tracing::info!(
+            "Closed segment for stream: {}, topic: {} with start offset: {}, 
end offset: {}, size: {} for partition with ID: {}.",
+            stream_id,
+            topic_id,
+            start_offset,
+            end_offset,
+            size,
+            partition_id
+        );
+
         registry
             .oneshot("fsync:segment-close-log")
             .critical(true)
@@ -1236,47 +1317,6 @@ impl Streams {
             })
             .spawn();
 
-        let (start_offset, size, end_offset) =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                (
-                    log.active_segment().start_offset,
-                    log.active_segment().size,
-                    log.active_segment().end_offset,
-                )
-            });
-
-        tracing::info!(
-            "Closed segment for stream: {}, topic: {} with start offset: {}, 
end offset: {}, size: {} for partition with ID: {}.",
-            stream_id,
-            topic_id,
-            start_offset,
-            end_offset,
-            size,
-            partition_id
-        );
-
-        let messages_size = 0;
-        let indexes_size = 0;
-        let segment = Segment::new(
-            end_offset + 1,
-            config.segment.size,
-            config.segment.message_expiry,
-        );
-
-        let storage = create_segment_storage(
-            config,
-            numeric_stream_id,
-            numeric_topic_id,
-            partition_id,
-            messages_size,
-            indexes_size,
-            end_offset + 1,
-        )
-        .await?;
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.add_persisted_segment(segment, storage);
-        });
-
         Ok(())
     }
 
@@ -1295,7 +1335,9 @@ impl Streams {
             return Ok(0);
         }
 
-        let batches = self.with_partition_by_id_mut(
+        // commit_journal now returns CommittedBatch which includes writers 
captured at commit time.
+        // This prevents race conditions where segment rotation could 
invalidate writers.
+        let committed = self.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
@@ -1311,7 +1353,7 @@ impl Streams {
         );
 
         let batch_count = self
-            .persist_messages_to_disk(stream_id, topic_id, partition_id, 
batches, config)
+            .persist_messages_to_disk(stream_id, topic_id, partition_id, 
committed, config)
             .await?;
 
         Ok(batch_count)
@@ -1322,9 +1364,16 @@ impl Streams {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        mut batches: IggyMessagesBatchSet,
+        committed: streaming_partitions::helpers::CommittedBatch,
         config: &SystemConfig,
     ) -> Result<u32, IggyError> {
+        let streaming_partitions::helpers::CommittedBatch {
+            mut batches,
+            segment_idx,
+            messages_writer,
+            index_writer,
+        } = committed;
+
         let batch_count = batches.count();
         let batch_size = batches.size();
 
@@ -1347,21 +1396,8 @@ impl Streams {
             log.set_in_flight(frozen.clone());
         });
 
-        let (messages_writer, index_writer) =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                (
-                    log.active_storage()
-                        .messages_writer
-                        .as_ref()
-                        .expect("Messages writer not initialized")
-                        .clone(),
-                    log.active_storage()
-                        .index_writer
-                        .as_ref()
-                        .expect("Index writer not initialized")
-                        .clone(),
-                )
-            });
+        // Writers were captured at commit time, so they're guaranteed to be 
valid
+        // even if segment rotation happened after the commit.
         let guard = messages_writer.lock.lock().await;
 
         let saved = messages_writer
@@ -1375,20 +1411,26 @@ impl Streams {
                 )
             })?;
 
-        // Extract unsaved indexes before async operation
+        // Extract unsaved indexes for the specific segment (handles None 
gracefully if cleared during rotation)
         let unsaved_indexes_slice =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                log.active_indexes().unwrap().unsaved_slice()
+                log.indexes()
+                    .get(segment_idx)
+                    .and_then(|opt| opt.as_ref())
+                    .map(|indexes| indexes.unsaved_slice())
             });
 
-        let indexes_len = unsaved_indexes_slice.len();
-        index_writer
-            .as_ref()
-            .save_indexes(unsaved_indexes_slice)
-            .await
-            .error(|e: &IggyError| {
-                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
-            })?;
+        // Only save indexes if they exist (may be None if segment was rotated 
and cleared)
+        if let Some(unsaved_indexes_slice) = unsaved_indexes_slice {
+            let indexes_len = unsaved_indexes_slice.len();
+            index_writer
+                .as_ref()
+                .save_indexes(unsaved_indexes_slice)
+                .await
+                .error(|e: &IggyError| {
+                    format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+                })?;
+        }
 
         tracing::trace!(
             "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
@@ -1403,7 +1445,11 @@ impl Streams {
             stream_id,
             topic_id,
             partition_id,
-            
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
+            streaming_partitions::helpers::update_index_and_increment_stats(
+                segment_idx,
+                saved,
+                config,
+            ),
         );
 
         self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 531e77e54..e62a9e420 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -32,11 +32,15 @@ use crate::{
             storage,
         },
         polling_consumer::ConsumerGroupId,
-        segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
+        segments::{
+            IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
IndexWriter,
+            MessagesWriter, storage::Storage,
+        },
     },
 };
 use err_trail::ErrContext;
 use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use std::rc::Rc;
 use std::{
     ops::AsyncFnOnce,
     sync::{Arc, atomic::Ordering},
@@ -430,12 +434,45 @@ pub fn append_to_journal(
     }
 }
 
-pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSet {
+/// Result of committing journal batches for a specific segment.
+/// Captures writers at commit time to prevent race conditions with segment 
rotation.
+pub struct CommittedBatch {
+    pub batches: IggyMessagesBatchSet,
+    pub segment_idx: usize,
+    pub messages_writer: Rc<MessagesWriter>,
+    pub index_writer: Rc<IndexWriter>,
+}
+
+/// Commits the journal and returns the batches along with the segment's 
writers.
+/// By capturing writers at commit time (within the same lock acquisition), we 
ensure
+/// that even if segment rotation happens after this call, we still have valid 
writers
+/// to complete the persistence operation.
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
CommittedBatch {
     |(.., log)| {
+        let segment_idx = log.segments().len().saturating_sub(1);
         let batches = log.journal_mut().commit();
         log.ensure_indexes();
         batches.append_indexes_to(log.active_indexes_mut().unwrap());
-        batches
+
+        // Capture writers NOW before any rotation can happen
+        let storage = log.active_storage();
+        let messages_writer = storage
+            .messages_writer
+            .as_ref()
+            .expect("Messages writer must exist at commit time")
+            .clone();
+        let index_writer = storage
+            .index_writer
+            .as_ref()
+            .expect("Index writer must exist at commit time")
+            .clone();
+
+        CommittedBatch {
+            batches,
+            segment_idx,
+            messages_writer,
+            index_writer,
+        }
     }
 }
 
@@ -531,16 +568,26 @@ pub fn persist_batch(
     }
 }
 
+/// Updates segment size and marks indexes as saved for a specific segment.
+/// Uses segment_idx to ensure we update the correct segment even after 
rotation.
+/// Gracefully handles the case where indexes were cleared during segment 
rotation.
 pub fn update_index_and_increment_stats(
+    segment_idx: usize,
     saved: IggyByteSize,
     config: &SystemConfig,
 ) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
     move |(.., log)| {
-        let segment = log.active_segment_mut();
-        segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + 
saved.as_bytes_u64());
-        log.active_indexes_mut().unwrap().mark_saved();
-        if config.segment.cache_indexes == CacheIndexesConfig::None {
-            log.active_indexes_mut().unwrap().clear();
+        // Update the specific segment we wrote to, not "active" which may 
have changed
+        if let Some(segment) = log.segments_mut().get_mut(segment_idx) {
+            segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + 
saved.as_bytes_u64());
+        }
+
+        // Handle indexes - may be None if segment was rotated and indexes 
cleared
+        if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) {
+            indexes.mark_saved();
+            if config.segment.cache_indexes == CacheIndexesConfig::None {
+                indexes.clear();
+            }
         }
     }
 }
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index 7ed09c2d4..878af0eb1 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -24,6 +24,8 @@ mod types;
 pub mod storage;
 
 pub use indexes::IggyIndexesMut;
+pub use indexes::IndexWriter;
+pub use messages::MessagesWriter;
 pub use segment::Segment;
 pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;


Reply via email to