This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch fix-segment-close-crash
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 900fb93ffa27f4819b1a7e8c62b7cdda704a0cc6
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 12:39:35 2026 +0100

    fix segment rotation crash
---
 Cargo.lock                                         |   2 +-
 DEPENDENCIES.md                                    |   2 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/segment_rotation_race_scenario.rs    | 196 ++++++++++
 core/integration/tests/server/specific.rs          |  84 +++-
 core/server/Cargo.toml                             |   2 +-
 .../handlers/messages/send_messages_handler.rs     |   3 +-
 core/server/src/shard/system/messages.rs           |  11 +-
 core/server/src/slab/streams.rs                    | 424 ++++++++++-----------
 core/server/src/streaming/partitions/helpers.rs    | 162 ++++----
 .../streaming/segments/messages/messages_writer.rs |  44 +--
 core/server/src/streaming/segments/messages/mod.rs |  52 +--
 core/server/src/streaming/segments/mod.rs          |   2 +
 13 files changed, 584 insertions(+), 401 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index dd5bc7d89..f88aaf4a6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -8270,7 +8270,7 @@ dependencies = [
 
 [[package]]
 name = "server"
-version = "0.6.1-edge.4"
+version = "0.6.1-edge.5"
 dependencies = [
  "ahash 0.8.12",
  "anyhow",
diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md
index 3ea9bbf8b..814a96a36 100644
--- a/DEPENDENCIES.md
+++ b/DEPENDENCIES.md
@@ -726,7 +726,7 @@ serde_with_macros: 3.16.1, "Apache-2.0 OR MIT",
 serde_yaml_ng: 0.10.0, "MIT",
 serial_test: 3.3.1, "MIT",
 serial_test_derive: 3.3.1, "MIT",
-server: 0.6.1-edge.4, "Apache-2.0",
+server: 0.6.1-edge.5, "Apache-2.0",
 sha1: 0.10.6, "Apache-2.0 OR MIT",
 sha2: 0.10.9, "Apache-2.0 OR MIT",
 sha3: 0.10.8, "Apache-2.0 OR MIT",
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 1f71098fb..a61636931 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -32,6 +32,7 @@ pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod permissions_scenario;
 pub mod read_during_persistence_scenario;
+pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
 pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
diff --git 
a/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs 
b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
new file mode 100644
index 000000000..9856777b2
--- /dev/null
+++ b/core/integration/tests/server/scenarios/segment_rotation_race_scenario.rs
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! This scenario fixes the bug that occured when concurrent message sends 
race with segment rotation:
+//! 1. Task A commits journal, ensures indexes for segment N, starts async save
+//! 2. Task B's send triggers segment rotation (handle_full_segment)
+//! 3. Task B clears segment N's indexes or creates segment N+1 with None 
indexes
+//! 4. Task A calls active_indexes().unwrap() - panics because indexes are None
+//!
+//! This test uses:
+//! - Very small segment size (512B) to trigger frequent rotations
+//! - 8 concurrent producers (2 per protocol: TCP, HTTP, QUIC, WebSocket)
+//! - All producers write to the same partition for maximum lock contention
+//! - Short message_saver interval to add more concurrent persist operations
+
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
+use std::time::Duration;
+use tokio::task::JoinSet;
+
+const STREAM_NAME: &str = "race-test-stream";
+const TOPIC_NAME: &str = "race-test-topic";
+const PRODUCERS_PER_PROTOCOL: usize = 2;
+const PARTITION_ID: u32 = 0;
+const TEST_DURATION_SECS: u64 = 10;
+const MESSAGES_PER_BATCH: usize = 5;
+
+/// Runs the segment rotation race condition test with multiple protocols.
+/// Each client factory represents a different protocol (TCP, HTTP, QUIC, 
WebSocket).
+/// 2 producers are spawned per protocol, all writing to the same partition.
+pub async fn run(client_factories: &[&dyn ClientFactory]) {
+    assert!(
+        !client_factories.is_empty(),
+        "At least one client factory required"
+    );
+
+    let admin_client = create_client(client_factories[0]).await;
+    login_root(&admin_client).await;
+
+    let total_producers = client_factories.len() * PRODUCERS_PER_PROTOCOL;
+    init_system(&admin_client, total_producers).await;
+
+    let stop_flag = Arc::new(AtomicBool::new(false));
+    let total_messages = Arc::new(AtomicU64::new(0));
+    let mut join_set = JoinSet::new();
+
+    let mut global_producer_id = 0usize;
+    for factory in client_factories {
+        let protocol = factory.transport();
+        for local_id in 0..PRODUCERS_PER_PROTOCOL {
+            let client = create_client(*factory).await;
+            login_root(&client).await;
+
+            let stop = stop_flag.clone();
+            let counter = total_messages.clone();
+            let producer_name = format!("{:?}-{}", protocol, local_id);
+            let producer_id = global_producer_id;
+
+            join_set.spawn(async move {
+                run_producer(
+                    client,
+                    producer_id,
+                    &producer_name,
+                    PARTITION_ID,
+                    stop,
+                    counter,
+                )
+                .await;
+            });
+
+            global_producer_id += 1;
+        }
+    }
+
+    tokio::time::sleep(Duration::from_secs(TEST_DURATION_SECS)).await;
+    stop_flag.store(true, Ordering::SeqCst);
+
+    while let Some(result) = join_set.join_next().await {
+        if let Err(e) = result
+            && e.is_panic()
+        {
+            let panic_info = e.into_panic();
+            let panic_msg = if let Some(s) = panic_info.downcast_ref::<&str>() 
{
+                s.to_string()
+            } else if let Some(s) = panic_info.downcast_ref::<String>() {
+                s.clone()
+            } else {
+                "Unknown panic".to_string()
+            };
+            panic!("Producer task panicked: {}", panic_msg);
+        }
+    }
+
+    let sent = total_messages.load(Ordering::SeqCst);
+    println!("Test completed successfully. Total messages sent: {}", sent);
+
+    cleanup(&admin_client).await;
+}
+
+async fn create_client(client_factory: &dyn ClientFactory) -> IggyClient {
+    let client = client_factory.create_client().await;
+    IggyClient::create(client, None, None)
+}
+
+async fn init_system(client: &IggyClient, total_producers: usize) {
+    client.create_stream(STREAM_NAME).await.unwrap();
+
+    client
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    println!(
+        "Created stream and topic with 1 partition, {} producers will contend 
for it",
+        total_producers
+    );
+}
+
+async fn run_producer(
+    client: IggyClient,
+    producer_id: usize,
+    producer_name: &str,
+    partition_id: u32,
+    stop: Arc<AtomicBool>,
+    counter: Arc<AtomicU64>,
+) {
+    let mut batch_num = 0u64;
+
+    while !stop.load(Ordering::SeqCst) {
+        let mut messages = Vec::with_capacity(MESSAGES_PER_BATCH);
+
+        for i in 0..MESSAGES_PER_BATCH {
+            let payload = format!("p{}:b{}:m{}", producer_id, batch_num, i);
+            let message = IggyMessage::builder()
+                .payload(payload.into_bytes().into())
+                .build()
+                .unwrap();
+            messages.push(message);
+        }
+
+        match client
+            .send_messages(
+                &Identifier::named(STREAM_NAME).unwrap(),
+                &Identifier::named(TOPIC_NAME).unwrap(),
+                &Partitioning::partition_id(partition_id),
+                &mut messages,
+            )
+            .await
+        {
+            Ok(_) => {
+                counter.fetch_add(MESSAGES_PER_BATCH as u64, Ordering::SeqCst);
+                batch_num += 1;
+            }
+            Err(e) => {
+                eprintln!("Producer {} send error: {}", producer_name, e);
+            }
+        }
+    }
+
+    println!(
+        "Producer {} (partition {}) stopped after {} batches",
+        producer_name, partition_id, batch_num
+    );
+}
+
+async fn cleanup(client: &IggyClient) {
+    client
+        .delete_stream(&Identifier::named(STREAM_NAME).unwrap())
+        .await
+        .unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 7df3d75d9..9a59ce468 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,14 +17,17 @@
  */
 
 use crate::server::scenarios::{
-    delete_segments_scenario, message_size_scenario, 
single_message_per_batch_scenario,
-    tcp_tls_scenario, websocket_tls_scenario,
+    delete_segments_scenario, message_size_scenario, 
segment_rotation_race_scenario,
+    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
 };
 use iggy::prelude::*;
 use integration::{
+    http_client::HttpClientFactory,
+    quic_client::QuicClientFactory,
     tcp_client::TcpClientFactory,
     test_server::{IpAddrKind, TestServer},
     test_tls_utils::generate_test_certificates,
+    websocket_client::WebSocketClientFactory,
 };
 use serial_test::parallel;
 use std::collections::HashMap;
@@ -212,3 +215,80 @@ async fn 
should_handle_single_message_per_batch_with_delayed_persistence() {
 
     single_message_per_batch_scenario::run(&client_factory, 5).await;
 }
+
+/// This test configures the server to trigger frequent segment rotations and 
runs
+/// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, 
WebSocket)
+/// to maximize the chance of hitting the race condition between 
persist_messages_to_disk
+/// and handle_full_segment.
+///
+/// Server configuration:
+/// - Very small segment size (512B) to trigger frequent rotations
+/// - Short message_saver interval (1s) to add concurrent persist operations
+/// - Small messages_required_to_save (32) to trigger more frequent saves
+/// - cache_indexes = none to trigger clear_active_indexes path
+///
+/// Test configuration:
+/// - 8 producers total (2 per protocol: TCP, HTTP, QUIC, WebSocket)
+/// - All producers write to the same partition for maximum lock contention
+#[tokio::test]
+#[parallel]
+async fn segment_rotation_scenario() {
+    let mut extra_envs = HashMap::new();
+
+    // Very small segment to trigger frequent rotations
+    extra_envs.insert("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"512B".to_string());
+
+    // Short message saver interval to add concurrent persist operations
+    extra_envs.insert("IGGY_MESSAGE_SAVER_INTERVAL".to_string(), 
"1s".to_string());
+
+    // Small threshold to trigger more frequent saves
+    extra_envs.insert(
+        "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+        "32".to_string(),
+    );
+
+    // cache_indexes = none triggers clear_active_indexes in 
handle_full_segment
+    extra_envs.insert(
+        "IGGY_SYSTEM_SEGMENT_CACHE_INDEXES".to_string(),
+        "none".to_string(),
+    );
+
+    // Disable socket migration to keep all connections on same shard
+    extra_envs.insert("IGGY_TCP_SOCKET_MIGRATION".to_string(), 
"false".to_string());
+
+    // Enable TCP nodelay for faster message throughput
+    extra_envs.insert(
+        "IGGY_TCP_SOCKET_OVERRIDE_DEFAULTS".to_string(),
+        "true".to_string(),
+    );
+    extra_envs.insert("IGGY_TCP_SOCKET_NODELAY".to_string(), 
"true".to_string());
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let tcp_factory = TcpClientFactory {
+        server_addr: test_server.get_raw_tcp_addr().unwrap(),
+        ..Default::default()
+    };
+
+    let http_factory = HttpClientFactory {
+        server_addr: test_server.get_http_api_addr().unwrap(),
+    };
+
+    let quic_factory = QuicClientFactory {
+        server_addr: test_server.get_quic_udp_addr().unwrap(),
+    };
+
+    let websocket_factory = WebSocketClientFactory {
+        server_addr: test_server.get_websocket_addr().unwrap(),
+    };
+
+    let factories: Vec<&dyn integration::test_server::ClientFactory> = vec![
+        &tcp_factory,
+        &http_factory,
+        &quic_factory,
+        &websocket_factory,
+    ];
+
+    segment_rotation_race_scenario::run(&factories).await;
+}
diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml
index e80c3446d..743f14f18 100644
--- a/core/server/Cargo.toml
+++ b/core/server/Cargo.toml
@@ -17,7 +17,7 @@
 
 [package]
 name = "server"
-version = "0.6.1-edge.4"
+version = "0.6.1-edge.5"
 edition = "2024"
 license = "Apache-2.0"
 
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 2e2748776..dcaa6925f 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -187,7 +187,8 @@ impl ServerCommandHandler for SendMessages {
                     .send_request_to_shard_or_recoil(Some(&namespace), 
socket_transfer_msg)
                     .await
                 {
-                    error!("tranfer socket to another shard failed, drop 
connection. {e:?}");
+                    // TODO: should we crash?
+                    error!("transfer socket to another shard failed, drop 
connection. {e:?}");
                     return Ok(HandlerResult::Finished);
                 }
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index 7f4f299e7..9ec66d9c9 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -315,28 +315,27 @@ impl IggyShard {
         partition_id: usize,
         fsync: bool,
     ) -> Result<(), IggyError> {
-        let frozen_batches = self.streams.with_partition_by_id_mut(
+        let committed = self.streams.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
-            partitions::helpers::commit_journal_with_in_flight(),
+            partitions::helpers::commit_journal(),
         );
 
-        if frozen_batches.is_empty() {
+        if committed.frozen.is_empty() {
             return Ok(());
         }
 
         self.streams
-            .persist_frozen_messages_to_disk(
+            .persist_messages_to_disk(
                 stream_id,
                 topic_id,
                 partition_id,
-                &frozen_batches,
+                committed,
                 &self.config.system,
             )
             .await?;
 
-        // Ensure all data is flushed to disk before returning
         if fsync {
             self.streams
                 .fsync_all_messages(stream_id, topic_id, partition_id)
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 37e4192c4..719728218 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -57,7 +57,7 @@ use crate::{
 };
 use ahash::AHashMap;
 use err_trail::ErrContext;
-use iggy_common::{Identifier, IggyError, IggyMessagesBatch, IggyTimestamp, 
PollingKind};
+use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
 use slab::Slab;
 use std::{
     cell::RefCell,
@@ -216,23 +216,64 @@ impl MainOps for Streams {
         let topic_id = ns.topic_id();
         let partition_id = ns.partition_id();
 
-        let current_offset = self.with_partition_by_id(
-            stream_id,
-            topic_id,
-            partition_id,
-            streaming_partitions::helpers::calculate_current_offset(),
-        );
+        // Acquire the lock on the current active segment's writer.
+        // We must verify the segment hasn't rotated between reading state and 
acquiring the lock.
+        // The writer must be stored outside the lock acquisition to keep it 
alive.
+        let mut messages_writer;
+        let mut current_offset;
+        let mut current_position;
+        let mut segment_start_offset;
+        let mut message_deduplicator;
+
+        let _write_guard = loop {
+            // Get messages_writer and all state atomically
+            (
+                messages_writer,
+                current_offset,
+                current_position,
+                segment_start_offset,
+                message_deduplicator,
+            ) = self.with_partition_by_id(
+                stream_id,
+                topic_id,
+                partition_id,
+                |(root, _, deduplicator, offset, _, _, log)| {
+                    let current_offset = if !root.should_increment_offset() {
+                        0
+                    } else {
+                        offset.load(std::sync::atomic::Ordering::Relaxed) + 1
+                    };
+                    let segment = log.active_segment();
+                    let writer = log
+                        .active_storage()
+                        .messages_writer
+                        .clone()
+                        .expect("Messages writer must exist for active 
segment");
+                    (
+                        writer,
+                        current_offset,
+                        segment.current_position,
+                        segment.start_offset,
+                        deduplicator.clone(),
+                    )
+                },
+            );
 
-        let current_position =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                log.active_segment().current_position
-            });
-        let (segment_start_offset, message_deduplicator) = 
self.with_partition_by_id(
-            stream_id,
-            topic_id,
-            partition_id,
-            
streaming_partitions::helpers::get_segment_start_offset_and_deduplicator(),
-        );
+            // Acquire the lock
+            let write_guard = messages_writer.lock.lock().await;
+
+            // Verify the segment hasn't changed while we were waiting
+            let current_segment_start =
+                self.with_partition_by_id(stream_id, topic_id, partition_id, 
|(.., log)| {
+                    log.active_segment().start_offset
+                });
+
+            if current_segment_start == segment_start_offset {
+                // Segment is still the same, we can proceed
+                break write_guard;
+            }
+            // Segment rotated, release lock (guard dropped) and retry with 
fresh state
+        };
 
         input
             .prepare_for_persistence(
@@ -250,6 +291,16 @@ impl MainOps for Streams {
             streaming_partitions::helpers::append_to_journal(current_offset, 
input),
         )?;
 
+        let is_full = self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            streaming_partitions::helpers::is_segment_full(),
+        );
+
+        // Release write lock before persistence I/O (persist_messages 
acquires it again)
+        drop(_write_guard);
+
         let unsaved_messages_count_exceeded =
             journal_messages_count >= 
config.partition.messages_required_to_save;
         let unsaved_messages_size_exceeded = journal_size
@@ -258,13 +309,6 @@ impl MainOps for Streams {
                 .size_of_messages_required_to_save
                 .as_bytes_u64() as u32;
 
-        let is_full = self.with_partition_by_id(
-            stream_id,
-            topic_id,
-            partition_id,
-            streaming_partitions::helpers::is_segment_full(),
-        );
-
         // Try committing the journal
         if is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded {
             let reason = self.with_partition_by_id(
@@ -1187,23 +1231,118 @@ impl Streams {
         let numeric_topic_id =
             self.with_topic_by_id(stream_id, topic_id, 
topics::helpers::get_topic_id());
 
-        if config.segment.cache_indexes == CacheIndexesConfig::OpenSegment
-            || config.segment.cache_indexes == CacheIndexesConfig::None
-        {
-            self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
-                log.clear_active_indexes();
+        let clear_indexes = config.segment.cache_indexes == 
CacheIndexesConfig::OpenSegment
+            || config.segment.cache_indexes == CacheIndexesConfig::None;
+
+        // First, check if segment is already sealed and get info needed for 
new segment creation
+        let segment_info =
+            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
+                // If segment is already sealed, another task is handling the 
closure
+                if log.active_segment().sealed {
+                    return None;
+                }
+                let segment = log.active_segment();
+                Some((
+                    segment.end_offset,
+                    segment.start_offset,
+                    segment.size,
+                    log.active_storage().messages_writer.clone(),
+                ))
             });
-        }
 
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.active_segment_mut().sealed = true;
-        });
-        let (log_writer, index_writer) =
+        // If None (sealed), another task is handling closure
+        let Some((end_offset, start_offset, size, writer_for_lock)) = 
segment_info else {
+            return Ok(());
+        };
+
+        // If writer is None, segment was already shutdown by another task
+        let Some(writer_for_lock) = writer_for_lock else {
+            return Ok(());
+        };
+
+        // CRITICAL: Create the new segment storage FIRST, before any 
modifications.
+        // This ensures there's always a valid active segment with writers 
available,
+        // preventing race conditions where commit_journal finds None writers.
+        let messages_size = 0;
+        let indexes_size = 0;
+        let new_segment = Segment::new(
+            end_offset + 1,
+            config.segment.size,
+            config.segment.message_expiry,
+        );
+
+        let new_storage = create_segment_storage(
+            config,
+            numeric_stream_id,
+            numeric_topic_id,
+            partition_id,
+            messages_size,
+            indexes_size,
+            end_offset + 1,
+        )
+        .await?;
+
+        // Now acquire the write lock to ensure all pending writes complete
+        let _write_guard = writer_for_lock.lock.lock().await;
+
+        // Atomically: seal old segment, shutdown storage, add new segment
+        // This ensures the new segment is available immediately when the old 
one is shutdown.
+        let writers =
             self.with_partition_by_id_mut(stream_id, topic_id, partition_id, 
|(.., log)| {
+                // Double-check sealed status (another task might have 
completed while we waited)
+                if log.active_segment().sealed {
+                    return None;
+                }
+
+                // Verify we're still operating on the same segment we read 
from.
+                // If another rotation happened, the active segment changed 
and our
+                // new_segment would have the wrong start_offset.
+                if log.active_segment().start_offset != start_offset {
+                    return None;
+                }
+
+                // Verify the segment is actually full. The is_full check in 
append_messages
+                // happens before releasing the lock, but by the time we get 
here, another
+                // task might have rotated the segment. We must re-verify.
+                if !log.active_segment().is_full() {
+                    return None;
+                }
+
+                // Clear indexes if configured
+                if clear_indexes {
+                    log.clear_active_indexes();
+                }
+
+                // Seal the old segment
+                log.active_segment_mut().sealed = true;
+
+                // Extract writers from old segment
                 let (msg, index) = log.active_storage_mut().shutdown();
-                (msg.unwrap(), index.unwrap())
+
+                // Add the new segment - this makes it the new "active" 
segment immediately
+                log.add_persisted_segment(new_segment, new_storage);
+
+                Some((msg, index))
             });
 
+        // Drop the write guard before spawning fsync tasks
+        drop(_write_guard);
+
+        // If None, another task already handled segment closure
+        let Some((Some(log_writer), Some(index_writer))) = writers else {
+            return Ok(());
+        };
+
+        tracing::info!(
+            "Closed segment for stream: {}, topic: {} with start offset: {}, 
end offset: {}, size: {} for partition with ID: {}.",
+            stream_id,
+            topic_id,
+            start_offset,
+            end_offset,
+            size,
+            partition_id
+        );
+
         registry
             .oneshot("fsync:segment-close-log")
             .critical(true)
@@ -1236,47 +1375,6 @@ impl Streams {
             })
             .spawn();
 
-        let (start_offset, size, end_offset) =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                (
-                    log.active_segment().start_offset,
-                    log.active_segment().size,
-                    log.active_segment().end_offset,
-                )
-            });
-
-        tracing::info!(
-            "Closed segment for stream: {}, topic: {} with start offset: {}, 
end offset: {}, size: {} for partition with ID: {}.",
-            stream_id,
-            topic_id,
-            start_offset,
-            end_offset,
-            size,
-            partition_id
-        );
-
-        let messages_size = 0;
-        let indexes_size = 0;
-        let segment = Segment::new(
-            end_offset + 1,
-            config.segment.size,
-            config.segment.message_expiry,
-        );
-
-        let storage = create_segment_storage(
-            config,
-            numeric_stream_id,
-            numeric_topic_id,
-            partition_id,
-            messages_size,
-            indexes_size,
-            end_offset + 1,
-        )
-        .await?;
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.add_persisted_segment(segment, storage);
-        });
-
         Ok(())
     }
 
@@ -1295,7 +1393,9 @@ impl Streams {
             return Ok(0);
         }
 
-        let batches = self.with_partition_by_id_mut(
+        // commit_journal now returns CommittedBatch which includes writers 
captured at commit time.
+        // This prevents race conditions where segment rotation could 
invalidate writers.
+        let committed = self.with_partition_by_id_mut(
             stream_id,
             topic_id,
             partition_id,
@@ -1311,7 +1411,7 @@ impl Streams {
         );
 
         let batch_count = self
-            .persist_messages_to_disk(stream_id, topic_id, partition_id, 
batches, config)
+            .persist_messages_to_disk(stream_id, topic_id, partition_id, 
committed, config)
             .await?;
 
         Ok(batch_count)
@@ -1322,16 +1422,24 @@ impl Streams {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        mut batches: IggyMessagesBatchSet,
+        committed: streaming_partitions::helpers::CommittedBatch,
         config: &SystemConfig,
     ) -> Result<u32, IggyError> {
-        let batch_count = batches.count();
-        let batch_size = batches.size();
-
+        let streaming_partitions::helpers::CommittedBatch {
+            segment_idx,
+            messages_writer,
+            index_writer,
+            unsaved_indexes,
+            frozen,
+        } = committed;
+
+        let batch_count = frozen.len() as u32;
         if batch_count == 0 {
             return Ok(0);
         }
 
+        let batch_size: u64 = frozen.iter().map(|b| b.size() as u64).sum();
+
         let has_segments =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
                 log.has_segments()
@@ -1341,27 +1449,11 @@ impl Streams {
             return Ok(0);
         }
 
-        // Store frozen batches in in-flight buffer for reads during async I/O
-        let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b| 
b.freeze()).collect();
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.set_in_flight(frozen.clone());
-        });
+        // In-flight buffer was already set atomically in commit_journal() to 
prevent
+        // race conditions where polls could miss data between journal commit 
and disk write.
 
-        let (messages_writer, index_writer) =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                (
-                    log.active_storage()
-                        .messages_writer
-                        .as_ref()
-                        .expect("Messages writer not initialized")
-                        .clone(),
-                    log.active_storage()
-                        .index_writer
-                        .as_ref()
-                        .expect("Index writer not initialized")
-                        .clone(),
-                )
-            });
+        // Writers were captured at commit time, so they're guaranteed to be 
valid
+        // even if segment rotation happened after the commit.
         let guard = messages_writer.lock.lock().await;
 
         let saved = messages_writer
@@ -1375,119 +1467,21 @@ impl Streams {
                 )
             })?;
 
-        // Extract unsaved indexes before async operation
-        let unsaved_indexes_slice =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                log.active_indexes().unwrap().unsaved_slice()
-            });
-
-        let indexes_len = unsaved_indexes_slice.len();
-        index_writer
-            .as_ref()
-            .save_indexes(unsaved_indexes_slice)
-            .await
-            .error(|e: &IggyError| {
-                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
-            })?;
-
-        tracing::trace!(
-            "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
-            batch_count,
-            stream_id,
-            topic_id,
-            partition_id,
-            saved
-        );
-
-        self.with_partition_by_id_mut(
-            stream_id,
-            topic_id,
-            partition_id,
-            
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
-        );
-
-        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
-            log.clear_in_flight();
-        });
-
-        drop(guard);
-        Ok(batch_count)
-    }
-
-    /// Persist frozen (immutable) batches to disk.
-    /// Assumes in-flight buffer is already set by caller.
-    /// Clears in-flight buffer after successful persist.
-    pub async fn persist_frozen_messages_to_disk(
-        &self,
-        stream_id: &Identifier,
-        topic_id: &Identifier,
-        partition_id: usize,
-        frozen_batches: &[IggyMessagesBatch],
-        config: &SystemConfig,
-    ) -> Result<u32, IggyError> {
-        let (batch_count, batch_size): (u32, u32) = frozen_batches
-            .iter()
-            .fold((0u32, 0u32), |(count_acc, size_acc), b| {
-                (count_acc + b.count(), size_acc + b.size())
-            });
-
-        if batch_count == 0 {
-            return Ok(0);
-        }
-
-        let has_segments =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                log.has_segments()
-            });
-
-        if !has_segments {
-            return Ok(0);
+        // Indexes were captured at commit time, so they're guaranteed to be 
valid
+        // even if segment rotation cleared the index buffer after the commit.
+        if !unsaved_indexes.is_empty() {
+            let indexes_len = unsaved_indexes.len();
+            index_writer
+                .as_ref()
+                .save_indexes(unsaved_indexes)
+                .await
+                .error(|e: &IggyError| {
+                    format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
+                })?;
         }
 
-        let (messages_writer, index_writer) =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                (
-                    log.active_storage()
-                        .messages_writer
-                        .as_ref()
-                        .expect("Messages writer not initialized")
-                        .clone(),
-                    log.active_storage()
-                        .index_writer
-                        .as_ref()
-                        .expect("Index writer not initialized")
-                        .clone(),
-                )
-            });
-        let guard = messages_writer.lock.lock().await;
-
-        let saved = messages_writer
-            .as_ref()
-            .save_frozen_batches(frozen_batches)
-            .await
-            .error(|e: &IggyError| {
-                format!(
-                    "Failed to save batch of {batch_count} messages \
-                    ({batch_size} bytes) to stream ID: {stream_id}, topic ID: 
{topic_id}, partition ID: {partition_id}. {e}",
-                )
-            })?;
-
-        let unsaved_indexes_slice =
-            self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
-                log.active_indexes().unwrap().unsaved_slice()
-            });
-
-        let indexes_len = unsaved_indexes_slice.len();
-        index_writer
-            .as_ref()
-            .save_indexes(unsaved_indexes_slice)
-            .await
-            .error(|e: &IggyError| {
-                format!("Failed to save index of {indexes_len} indexes to 
stream ID: {stream_id}, topic ID: {topic_id} {partition_id}. {e}",)
-            })?;
-
         tracing::trace!(
-            "Persisted {} frozen messages on disk for stream ID: {}, topic ID: 
{}, for partition with ID: {}, total bytes written: {}.",
+            "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
             batch_count,
             stream_id,
             topic_id,
@@ -1499,7 +1493,11 @@ impl Streams {
             stream_id,
             topic_id,
             partition_id,
-            
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
+            streaming_partitions::helpers::update_index_and_increment_stats(
+                segment_idx,
+                saved,
+                config,
+            ),
         );
 
         self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index c9c8e029c..6010703c9 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -32,11 +32,15 @@ use crate::{
             storage,
         },
         polling_consumer::ConsumerGroupId,
-        segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
+        segments::{
+            IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
IndexWriter,
+            MessagesWriter, storage::Storage,
+        },
     },
 };
 use err_trail::ErrContext;
-use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use iggy_common::{ConsumerOffsetInfo, IggyByteSize, IggyError, 
IggyMessagesBatch, PooledBuffer};
+use std::rc::Rc;
 use std::{
     ops::AsyncFnOnce,
     sync::{Arc, atomic::Ordering},
@@ -430,35 +434,69 @@ pub fn append_to_journal(
     }
 }
 
-/// Commit journal and set in-flight buffer with frozen batches.
-/// Returns frozen batches for persisting to disk.
-pub fn commit_journal_with_in_flight()
--> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
Vec<iggy_common::IggyMessagesBatch> {
-    |(.., log)| {
-        let mut batches = log.journal_mut().commit();
-        if batches.is_empty() {
-            return Vec::new();
-        }
-        log.ensure_indexes();
-        batches.append_indexes_to(log.active_indexes_mut().unwrap());
-        let frozen: Vec<_> = batches.iter_mut().map(|b| b.freeze()).collect();
-        log.set_in_flight(frozen.clone());
-        frozen
-    }
-}
-
 pub fn clear_in_flight() -> impl FnOnce(ComponentsById<PartitionRefMut>) {
     |(.., log)| {
         log.clear_in_flight();
     }
 }
 
-pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
IggyMessagesBatchSet {
+/// Result of committing journal batches for a specific segment.
+/// Captures writers, indexes, and frozen batches at commit time to prevent 
race conditions.
+/// The in-flight buffer is set atomically during commit to ensure polls can 
always find data.
+pub struct CommittedBatch {
+    pub segment_idx: usize,
+    pub messages_writer: Rc<MessagesWriter>,
+    pub index_writer: Rc<IndexWriter>,
+    /// Indexes captured at commit time, before any rotation can clear them.
+    pub unsaved_indexes: PooledBuffer,
+    /// Frozen (immutable) batches ready for disk persistence.
+    /// These are also stored in the in-flight buffer for reads during async 
I/O.
+    pub frozen: Vec<IggyMessagesBatch>,
+}
+
+/// Commits the journal and returns the batches along with the segment's 
writers and indexes.
+/// By capturing writers, indexes, and frozen batches at commit time (within 
the same lock
+/// acquisition), we ensure that even if segment rotation happens after this 
call, we still
+/// have valid data to complete the persistence operation.
+///
+/// CRITICAL: This function also sets the in-flight buffer atomically. This 
prevents a race
+/// condition where a poll could arrive after journal commit but before 
in-flight is set,
+/// finding both journal and in-flight empty while data is not yet on disk.
+pub fn commit_journal() -> impl FnOnce(ComponentsById<PartitionRefMut>) -> 
CommittedBatch {
     |(.., log)| {
-        let batches = log.journal_mut().commit();
+        let segment_idx = log.segments().len().saturating_sub(1);
+        let mut batches = log.journal_mut().commit();
         log.ensure_indexes();
         batches.append_indexes_to(log.active_indexes_mut().unwrap());
-        batches
+
+        // Capture writers NOW before any rotation can happen
+        let storage = log.active_storage();
+        let messages_writer = storage
+            .messages_writer
+            .as_ref()
+            .expect("Messages writer must exist at commit time")
+            .clone();
+        let index_writer = storage
+            .index_writer
+            .as_ref()
+            .expect("Index writer must exist at commit time")
+            .clone();
+
+        // Capture indexes NOW before any rotation can clear them
+        let unsaved_indexes = log.active_indexes().unwrap().unsaved_slice();
+
+        // Freeze batches and set in-flight ATOMICALLY within this lock 
acquisition.
+        // This ensures polls can always find data either in journal or 
in-flight.
+        let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b| 
b.freeze()).collect();
+        log.set_in_flight(frozen.clone());
+
+        CommittedBatch {
+            segment_idx,
+            messages_writer,
+            index_writer,
+            unsaved_indexes,
+            frozen,
+        }
     }
 }
 
@@ -494,76 +532,26 @@ pub fn persist_reason(
     }
 }
 
-pub fn persist_batch(
-    stream_id: &Identifier,
-    topic_id: &Identifier,
-    partition_id: usize,
-    batches: IggyMessagesBatchSet,
-    reason: String,
-) -> impl AsyncFnOnce(ComponentsById<PartitionRef>) -> Result<(IggyByteSize, 
u32), IggyError> {
-    async move |(.., log)| {
-        tracing::trace!(
-            "Persisting messages on disk for stream ID: {}, topic ID: {}, 
partition ID: {} because {}...",
-            stream_id,
-            topic_id,
-            partition_id,
-            reason
-        );
-
-        let batch_count = batches.count();
-        let batch_size = batches.size();
-
-        let storage = log.active_storage();
-        let saved = storage
-            .messages_writer
-            .as_ref()
-            .expect("Messages writer not initialized")
-            .save_batch_set(batches)
-            .await
-            .error(|e: &IggyError| {
-                let segment = log.active_segment();
-                format!(
-                    "Failed to save batch of {batch_count} messages \
-                                    ({batch_size} bytes) to {segment}. {e}",
-                )
-            })?;
-
-        let unsaved_indexes_slice = 
log.active_indexes().unwrap().unsaved_slice();
-        let len = unsaved_indexes_slice.len();
-        storage
-            .index_writer
-            .as_ref()
-            .expect("Index writer not initialized")
-            .save_indexes(unsaved_indexes_slice)
-            .await
-            .error(|e: &IggyError| {
-                let segment = log.active_segment();
-                format!("Failed to save index of {len} indexes to {segment}. 
{e}",)
-            })?;
-
-        tracing::trace!(
-            "Persisted {} messages on disk for stream ID: {}, topic ID: {}, 
for partition with ID: {}, total bytes written: {}.",
-            batch_count,
-            stream_id,
-            topic_id,
-            partition_id,
-            saved
-        );
-
-        Ok((saved, batch_count))
-    }
-}
-
+/// Updates segment size and marks indexes as saved for a specific segment.
+/// Uses segment_idx to ensure we update the correct segment even after 
rotation.
+/// Gracefully handles the case where indexes were cleared during segment 
rotation.
 pub fn update_index_and_increment_stats(
+    segment_idx: usize,
     saved: IggyByteSize,
     config: &SystemConfig,
 ) -> impl FnOnce(ComponentsById<PartitionRefMut>) {
     move |(.., log)| {
-        let segment = log.active_segment_mut();
-        segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + 
saved.as_bytes_u64());
-        log.active_indexes_mut().unwrap().mark_saved();
-        if config.segment.cache_indexes == CacheIndexesConfig::None {
-            log.active_indexes_mut().unwrap().clear();
+        // Update the specific segment we wrote to, not "active" which may 
have changed
+        if let Some(segment) = log.segments_mut().get_mut(segment_idx) {
+            segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + 
saved.as_bytes_u64());
+        }
+
+        // Handle indexes - may be None if segment was rotated and indexes 
cleared
+        if let Some(Some(indexes)) = log.indexes_mut().get_mut(segment_idx) {
+            indexes.mark_saved();
+            if config.segment.cache_indexes == CacheIndexesConfig::None {
+                indexes.clear();
+            }
         }
     }
 }
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 85d3bd215..3ef14c406 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,10 +16,7 @@
  * under the License.
  */
 
-use crate::streaming::segments::{
-    IggyMessagesBatchSet,
-    messages::{write_batch, write_batch_frozen},
-};
+use crate::streaming::segments::messages::write_batch_frozen;
 use compio::fs::{File, OpenOptions};
 use err_trail::ErrContext;
 use iggy_common::{IggyByteSize, IggyError, IggyMessagesBatch};
@@ -96,46 +93,7 @@ impl MessagesWriter {
         })
     }
 
-    /// Append a batch of messages to the messages file.
-    pub async fn save_batch_set(
-        &self,
-        batch_set: IggyMessagesBatchSet,
-    ) -> Result<IggyByteSize, IggyError> {
-        let messages_size = batch_set.size();
-        let messages_count = batch_set.count();
-        let containers_count = batch_set.containers_count();
-        trace!(
-            "Saving batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to messages file: 
{}",
-            self.file_path
-        );
-        let position = self.messages_size_bytes.load(Ordering::Relaxed);
-        let file = &self.file;
-        write_batch(file, position, batch_set)
-            .await
-            .error(|e: &IggyError| {
-                format!(
-                    "Failed to write batch to messages file: {}. {e}",
-                    self.file_path
-                )
-            })?;
-
-        if self.fsync {
-            let _ = self.fsync().await;
-        }
-
-        self.messages_size_bytes
-            .fetch_add(messages_size as u64, Ordering::Release);
-        trace!(
-            "Written batch set of size {messages_size} bytes 
({containers_count} containers, {messages_count} messages) to disk messages 
file: {}",
-            self.file_path
-        );
-
-        Ok(IggyByteSize::from(messages_size as u64))
-    }
-
     /// Append frozen (immutable) batches to the messages file.
-    ///
-    /// Unlike `save_batch_set`, this method does not take ownership of the 
batches.
     /// The caller retains the batches (for use in in-flight buffer) while 
disk I/O proceeds.
     pub async fn save_frozen_batches(
         &self,
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index 8f5ba9c81..7289f8334 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -19,10 +19,10 @@
 mod messages_reader;
 mod messages_writer;
 
-use super::IggyMessagesBatchSet;
 use bytes::Bytes;
 use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer};
+use iggy_common::{IggyError, IggyMessagesBatch};
+use tracing::error;
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
@@ -32,30 +32,7 @@ pub use messages_writer::MessagesWriter;
 /// cross-platform compatibility and leave room for any internal overhead.
 const MAX_IOV_COUNT: usize = 1024;
 
-/// Vectored write a batches of messages to file
-async fn write_batch(
-    file: &File,
-    position: u64,
-    mut batches: IggyMessagesBatchSet,
-) -> Result<usize, IggyError> {
-    let (total_written, buffers) =
-        batches
-            .iter_mut()
-            .fold((0usize, Vec::new()), |(size, mut bufs), batch| {
-                let batch_size = batch.size() as usize;
-                bufs.push(batch.take_messages());
-                (size + batch_size, bufs)
-            });
-
-    write_vectored_chunked_pooled(file, position, buffers).await?;
-    Ok(total_written)
-}
-
 /// Vectored write frozen (immutable) batches to file.
-///
-/// This function writes `IggyMessagesBatch` (immutable, Arc-backed) directly
-/// without transferring ownership. The caller retains the batches for reads
-/// during the async I/O operation.
 pub async fn write_batch_frozen(
     file: &File,
     position: u64,
@@ -73,26 +50,6 @@ pub async fn write_batch_frozen(
     Ok(total_written)
 }
 
-/// Writes PooledBuffer buffers to file using vectored I/O, chunking to 
respect IOV_MAX limits.
-async fn write_vectored_chunked_pooled(
-    file: &File,
-    mut position: u64,
-    buffers: Vec<PooledBuffer>,
-) -> Result<(), IggyError> {
-    let mut iter = buffers.into_iter().peekable();
-
-    while iter.peek().is_some() {
-        let chunk: Vec<PooledBuffer> = 
iter.by_ref().take(MAX_IOV_COUNT).collect();
-        let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
-
-        let (result, _) = (&*file).write_vectored_all_at(chunk, 
position).await.into();
-        result.map_err(|_| IggyError::CannotWriteToFile)?;
-
-        position += chunk_size as u64;
-    }
-    Ok(())
-}
-
 /// Writes Bytes buffers to file using vectored I/O, chunking to respect 
IOV_MAX limits.
 async fn write_vectored_chunked_bytes(
     file: &File,
@@ -107,7 +64,10 @@ async fn write_vectored_chunked_bytes(
             .write_vectored_all_at(chunk_vec, position)
             .await
             .into();
-        result.map_err(|_| IggyError::CannotWriteToFile)?;
+        result.map_err(|e| {
+            error!("Failed to write frozen batch to messages file: {e}");
+            IggyError::CannotWriteToFile
+        })?;
 
         position += chunk_size as u64;
     }
diff --git a/core/server/src/streaming/segments/mod.rs 
b/core/server/src/streaming/segments/mod.rs
index 7ed09c2d4..878af0eb1 100644
--- a/core/server/src/streaming/segments/mod.rs
+++ b/core/server/src/streaming/segments/mod.rs
@@ -24,6 +24,8 @@ mod types;
 pub mod storage;
 
 pub use indexes::IggyIndexesMut;
+pub use indexes::IndexWriter;
+pub use messages::MessagesWriter;
 pub use segment::Segment;
 pub use types::IggyMessageHeaderViewMut;
 pub use types::IggyMessageViewMut;

Reply via email to