This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a53a4ed3d fix(server): maintain message availability during async disk 
I/O (#2575)
a53a4ed3d is described below

commit a53a4ed3d0f344327e1d3a85a86bbff3aca783f9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 12:54:45 2026 +0100

    fix(server): maintain message availability during async disk I/O (#2575)
    
    Messages were temporarily unreadable after journal commit but
    before disk write completed. Freeze batches to Arc-backed
    Bytes so the same data can be held in an in-flight buffer for
    reads while compio writes.
---
 core/common/src/alloc/buffer.rs                    |  23 ++-
 core/common/src/types/message/messages_batch.rs    |  10 ++
 core/common/src/types/message/polled_messages.rs   |   8 +-
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/read_during_persistence_scenario.rs  | 195 +++++++++++++++++++++
 .../handlers/messages/poll_messages_handler.rs     |   6 +-
 core/server/src/slab/streams.rs                    |  72 ++++++--
 core/server/src/streaming/partitions/helpers.rs    |  13 +-
 core/server/src/streaming/partitions/in_flight.rs  |  77 ++++++++
 core/server/src/streaming/partitions/log.rs        |  23 ++-
 core/server/src/streaming/partitions/mod.rs        |   1 +
 .../src/streaming/segments/indexes/indexes_mut.rs  |  13 +-
 .../streaming/segments/messages/messages_writer.rs |  38 +++-
 core/server/src/streaming/segments/messages/mod.rs |  23 ++-
 .../streaming/segments/types/messages_batch_mut.rs |  16 +-
 .../streaming/segments/types/messages_batch_set.rs |  29 ++-
 16 files changed, 508 insertions(+), 40 deletions(-)

diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 2d5a65c75..601fe6a86 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -17,7 +17,7 @@
  */
 
 use super::memory_pool::{BytesMutExt, memory_pool};
-use bytes::{Buf, BufMut, BytesMut};
+use bytes::{Buf, BufMut, Bytes, BytesMut};
 use compio::buf::{IoBuf, IoBufMut, SetBufInit};
 use std::ops::{Deref, DerefMut};
 
@@ -197,6 +197,27 @@ impl PooledBuffer {
         let mut this = std::mem::ManuallyDrop::new(self);
         std::mem::take(&mut this.inner)
     }
+
+    /// Freezes the buffer, converting it to an immutable `Bytes`.
+    ///
+    /// After calling this method, the PooledBuffer becomes empty and will not
+    /// return memory to the pool on drop (the frozen Bytes owns the 
allocation).
+    /// The returned `Bytes` is Arc-backed, allowing cheap clones.
+    pub fn freeze(&mut self) -> Bytes {
+        // Decrement pool counter since memory is transferred to Bytes
+        // and won't be returned to the pool.
+        if self.from_pool
+            && let Some(bucket_idx) = self.original_bucket_idx
+        {
+            memory_pool().dec_bucket_in_use(bucket_idx);
+        }
+
+        let inner = std::mem::take(&mut self.inner);
+        self.from_pool = false;
+        self.original_capacity = 0;
+        self.original_bucket_idx = None;
+        inner.freeze()
+    }
 }
 
 impl Deref for PooledBuffer {
diff --git a/core/common/src/types/message/messages_batch.rs 
b/core/common/src/types/message/messages_batch.rs
index 3d03159e5..6bc704866 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -75,11 +75,21 @@ impl IggyMessagesBatch {
         &self.messages
     }
 
+    /// Get the messages as a cloned `Bytes` (cheap Arc increment).
+    pub fn messages_bytes(&self) -> Bytes {
+        self.messages.clone()
+    }
+
     /// Get the indexes slice
     pub fn indexes_slice(&self) -> &[u8] {
         &self.indexes
     }
 
+    /// Get a reference to the indexes
+    pub fn indexes(&self) -> &IggyIndexes {
+        &self.indexes
+    }
+
     /// Take the indexes from the batch
     pub fn take_indexes(&mut self) -> IggyIndexes {
         std::mem::take(&mut self.indexes)
diff --git a/core/common/src/types/message/polled_messages.rs 
b/core/common/src/types/message/polled_messages.rs
index 9e091437b..8204ffde2 100644
--- a/core/common/src/types/message/polled_messages.rs
+++ b/core/common/src/types/message/polled_messages.rs
@@ -111,12 +111,16 @@ fn messages_from_bytes_and_count(buffer: Bytes, count: 
u32) -> Result<Vec<IggyMe
         let payload = buffer.slice(position..payload_end);
         position = payload_end;
 
+        let user_headers_end = position + header.user_headers_length as usize;
+        if user_headers_end > buf_len {
+            break;
+        }
         let user_headers = if header.user_headers_length > 0 {
-            Some(buffer.slice(position..position + header.user_headers_length 
as usize))
+            Some(buffer.slice(position..user_headers_end))
         } else {
             None
         };
-        position += header.user_headers_length as usize;
+        position = user_headers_end;
 
         messages.push(IggyMessage {
             header,
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index 1676cefa3..2c2fbab3a 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -30,6 +30,7 @@ pub mod delete_segments_scenario;
 pub mod encryption_scenario;
 pub mod message_headers_scenario;
 pub mod message_size_scenario;
+pub mod read_during_persistence_scenario;
 pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
diff --git 
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs 
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
new file mode 100644
index 000000000..0ff9c8bd3
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
@@ -0,0 +1,195 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::{
+    tcp_client::TcpClientFactory,
+    test_server::{ClientFactory, IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer, 
login_root},
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "eventual-consistency-stream";
+const TOPIC_NAME: &str = "eventual-consistency-topic";
+const TEST_DURATION_SECS: u64 = 10;
+
+/// Test with two separate clients - one for sending, one for polling.
+///
+/// This should expose the race condition where messages are in-flight during
+/// disk write and unavailable for polling.
+#[tokio::test]
+#[parallel]
+async fn 
should_read_messages_immediately_after_send_with_aggressive_persistence() {
+    let env_vars = HashMap::from([
+        (
+            SYSTEM_PATH_ENV_VAR.to_owned(),
+            TestServer::get_random_path(),
+        ),
+        (
+            "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
+            "32".to_owned(),
+        ),
+        (
+            
"IGGY_SYSTEM_PARTITION_SIZE_OF_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
+            "512B".to_owned(),
+        ),
+        (
+            "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_owned(),
+            "false".to_owned(),
+        ),
+    ]);
+
+    let mut test_server = TestServer::new(Some(env_vars), true, None, 
IpAddrKind::V4);
+    test_server.start();
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+    let producer_client = TcpClientFactory {
+        server_addr: server_addr.clone(),
+        ..Default::default()
+    }
+    .create_client()
+    .await;
+    let producer = IggyClient::create(producer_client, None, None);
+    login_root(&producer).await;
+
+    producer.create_stream(STREAM_NAME).await.unwrap();
+    producer
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let consumer_client = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    }
+    .create_client()
+    .await;
+    let consumer = IggyClient::create(consumer_client, None, None);
+    login_root(&consumer).await;
+
+    let stream_id = Identifier::named(STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+    let consumer_kind = Consumer::default();
+
+    let test_duration = Duration::from_secs(TEST_DURATION_SECS);
+    let messages_per_batch = 32u32;
+    let payload = "X".repeat(1024);
+
+    let start = Instant::now();
+    let mut batches_sent = 0u64;
+    let mut messages_sent = 0u64;
+
+    println!(
+        "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
+        messages_per_batch, TEST_DURATION_SECS
+    );
+
+    while start.elapsed() < test_duration {
+        let base_offset = batches_sent * messages_per_batch as u64;
+
+        let mut messages: Vec<IggyMessage> = (0..messages_per_batch)
+            .map(|i| {
+                IggyMessage::builder()
+                    .id((base_offset + i as u64 + 1) as u128)
+                    .payload(Bytes::from(format!(
+                        "{} - batch {} msg {}",
+                        payload, batches_sent, i
+                    )))
+                    .build()
+                    .expect("Failed to create message")
+            })
+            .collect();
+
+        println!("Sending batch {}", batches_sent);
+        let send_result = producer
+            .send_messages(
+                &stream_id,
+                &topic_id,
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await;
+        match &send_result {
+            Ok(_) => println!("Batch {} sent successfully", batches_sent),
+            Err(e) => println!("Batch {} send error: {:?}", batches_sent, e),
+        }
+        send_result.unwrap();
+
+        batches_sent += 1;
+        messages_sent += messages_per_batch as u64;
+
+        println!("Calling poll_messages after batch {}", batches_sent);
+        let poll_result = consumer
+            .poll_messages(
+                &stream_id,
+                &topic_id,
+                Some(0),
+                &consumer_kind,
+                &PollingStrategy::offset(0),
+                messages_sent as u32,
+                false,
+            )
+            .await;
+
+        let polled_count = match &poll_result {
+            Ok(polled) => polled.messages.len() as u64,
+            Err(e) => {
+                println!("Poll error: {:?}", e);
+                0
+            }
+        };
+
+        if polled_count < messages_sent {
+            let missing = messages_sent - polled_count;
+            let elapsed_ms = start.elapsed().as_millis();
+
+            panic!(
+                "RACE CONDITION DETECTED after {:.2}s/{}s ({} batches, {} 
messages), expected {} messages, got {}. Missing: {}",
+                elapsed_ms as f64 / 1000.0,
+                TEST_DURATION_SECS,
+                batches_sent,
+                messages_sent,
+                messages_sent,
+                polled_count,
+                missing
+            );
+        }
+
+        if batches_sent.is_multiple_of(1000) {
+            println!(
+                "Progress: {} batches, {} messages, elapsed: {:.2}s/{}s",
+                batches_sent,
+                messages_sent,
+                start.elapsed().as_secs_f64(),
+                TEST_DURATION_SECS
+            );
+        }
+    }
+
+    producer.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs 
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index ea8ccbb8c..354b554b0 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -92,7 +92,7 @@ impl ServerCommandHandler for PollMessages {
         let response_length = 4 + 8 + 4 + batch.size();
         let response_length_bytes = response_length.to_le_bytes();
 
-        let mut bufs = Vec::with_capacity(batch.containers_count() + 5);
+        let mut bufs = Vec::with_capacity(batch.containers_count() + 3);
         let mut partition_id_buf = PooledBuffer::with_capacity(4);
         let mut current_offset_buf = PooledBuffer::with_capacity(8);
         let mut count_buf = PooledBuffer::with_capacity(4);
@@ -104,7 +104,9 @@ impl ServerCommandHandler for PollMessages {
         bufs.push(current_offset_buf);
         bufs.push(count_buf);
 
-        batch.iter_mut().for_each(|m| bufs.push(m.take_messages()));
+        batch.iter_mut().for_each(|m| {
+            bufs.push(m.take_messages());
+        });
         trace!(
             "Sending {} messages to client ({} bytes) to client",
             batch.count(),
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 66e5c7cbb..f1c342de2 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -57,7 +57,7 @@ use crate::{
 };
 use ahash::AHashMap;
 use err_trail::ErrContext;
-use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
+use iggy_common::{Identifier, IggyError, IggyMessagesBatch, IggyTimestamp, 
PollingKind};
 use slab::Slab;
 use std::{
     cell::RefCell,
@@ -737,23 +737,48 @@ impl Streams {
         count: u32,
         segment_start_offset: u64,
     ) -> Result<IggyMessagesBatchSet, IggyError> {
-        let (is_journal_empty, journal_first_offset, journal_last_offset) = 
self
-            .with_partition_by_id(
-                stream_id,
-                topic_id,
-                partition_id,
-                |(_, _, _, _, _, _, log)| {
-                    let journal = log.journal();
-                    (
-                        journal.is_empty(),
-                        journal.inner().base_offset,
-                        journal.inner().current_offset,
-                    )
-                },
-            );
+        let (
+            is_journal_empty,
+            journal_first_offset,
+            journal_last_offset,
+            in_flight_empty,
+            in_flight_first,
+            in_flight_last,
+        ) = self.with_partition_by_id(
+            stream_id,
+            topic_id,
+            partition_id,
+            |(_, _, _, _, _, _, log)| {
+                let journal = log.journal();
+                let in_flight = log.in_flight();
+                (
+                    journal.is_empty(),
+                    journal.inner().base_offset,
+                    journal.inner().current_offset,
+                    in_flight.is_empty(),
+                    in_flight.first_offset(),
+                    in_flight.last_offset(),
+                )
+            },
+        );
 
-        // Case 0: Accumulator is empty, so all messages have to be on disk
+        // Case 0: Journal is empty, check in-flight buffer or disk
         if is_journal_empty {
+            if !in_flight_empty && offset >= in_flight_first && offset <= 
in_flight_last {
+                let mut result = IggyMessagesBatchSet::empty();
+                let in_flight_batches = self.with_partition_by_id(
+                    stream_id,
+                    topic_id,
+                    partition_id,
+                    |(_, _, _, _, _, _, log)| 
log.in_flight().get_by_offset(offset, count).to_vec(),
+                );
+                if !in_flight_batches.is_empty() {
+                    result.add_immutable_batches(&in_flight_batches);
+                    let final_result = result.get_by_offset(offset, count);
+                    return Ok(final_result);
+                }
+            }
+
             return self
                 .load_messages_from_disk_by_offset(
                     stream_id,
@@ -1297,7 +1322,7 @@ impl Streams {
         stream_id: &Identifier,
         topic_id: &Identifier,
         partition_id: usize,
-        batches: IggyMessagesBatchSet,
+        mut batches: IggyMessagesBatchSet,
         config: &SystemConfig,
     ) -> Result<u32, IggyError> {
         let batch_count = batches.count();
@@ -1316,7 +1341,12 @@ impl Streams {
             return Ok(0);
         }
 
-        // Extract storage before async operations
+        // Store frozen batches in in-flight buffer for reads during async I/O
+        let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b| 
b.freeze()).collect();
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.set_in_flight(frozen.clone());
+        });
+
         let (messages_writer, index_writer) =
             self.with_partition_by_id(stream_id, topic_id, partition_id, |(.., 
log)| {
                 (
@@ -1336,7 +1366,7 @@ impl Streams {
 
         let saved = messages_writer
             .as_ref()
-            .save_batch_set(batches)
+            .save_frozen_batches(&frozen)
             .await
             .error(|e: &IggyError| {
                 format!(
@@ -1376,6 +1406,10 @@ impl Streams {
             
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
         );
 
+        self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(.., 
log)| {
+            log.clear_in_flight();
+        });
+
         drop(guard);
         Ok(batch_count)
     }
diff --git a/core/server/src/streaming/partitions/helpers.rs 
b/core/server/src/streaming/partitions/helpers.rs
index 6d14f8971..531e77e54 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -15,13 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use err_trail::ErrContext;
-use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
-use std::{
-    ops::AsyncFnOnce,
-    sync::{Arc, atomic::Ordering},
-};
-
 use crate::{
     configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
     slab::{
@@ -42,6 +35,12 @@ use crate::{
         segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet, 
storage::Storage},
     },
 };
+use err_trail::ErrContext;
+use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use std::{
+    ops::AsyncFnOnce,
+    sync::{Arc, atomic::Ordering},
+};
 
 pub fn get_partition_ids() -> impl FnOnce(&Partitions) -> Vec<usize> {
     |partitions| {
diff --git a/core/server/src/streaming/partitions/in_flight.rs 
b/core/server/src/streaming/partitions/in_flight.rs
new file mode 100644
index 000000000..af90d3e7d
--- /dev/null
+++ b/core/server/src/streaming/partitions/in_flight.rs
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::IggyMessagesBatch;
+
+/// Holds batches that are being written to disk.
+///
+/// During async I/O, messages are transferred from the journal to disk.
+/// This buffer holds frozen (immutable, Arc-backed) copies so consumers
+/// can still read them during the write operation.
+#[derive(Debug, Default)]
+pub struct IggyMessagesBatchSetInFlight {
+    batches: Vec<IggyMessagesBatch>,
+    first_offset: u64,
+    last_offset: u64,
+}
+
+impl IggyMessagesBatchSetInFlight {
+    pub fn is_empty(&self) -> bool {
+        self.batches.is_empty()
+    }
+
+    pub fn first_offset(&self) -> u64 {
+        self.first_offset
+    }
+
+    pub fn last_offset(&self) -> u64 {
+        self.last_offset
+    }
+
+    pub fn set(&mut self, batches: Vec<IggyMessagesBatch>) {
+        if batches.is_empty() {
+            self.clear();
+            return;
+        }
+        self.first_offset = batches.first().and_then(|b| 
b.first_offset()).unwrap_or(0);
+        self.last_offset = batches.last().and_then(|b| 
b.last_offset()).unwrap_or(0);
+        self.batches = batches;
+    }
+
+    pub fn clear(&mut self) {
+        self.batches.clear();
+        self.first_offset = 0;
+        self.last_offset = 0;
+    }
+
+    pub fn get_by_offset(&self, start_offset: u64, count: u32) -> 
&[IggyMessagesBatch] {
+        if self.is_empty() || start_offset > self.last_offset {
+            return &[];
+        }
+
+        let end_offset = start_offset + count as u64 - 1;
+        if end_offset < self.first_offset {
+            return &[];
+        }
+
+        &self.batches
+    }
+
+    pub fn batches(&self) -> &[IggyMessagesBatch] {
+        &self.batches
+    }
+}
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index e94adbc80..ae7c1efaa 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -16,16 +16,17 @@
 // under the License.
 
 use crate::streaming::{
-    partitions::journal::Journal,
+    partitions::{in_flight::IggyMessagesBatchSetInFlight, journal::Journal},
     segments::{IggyIndexesMut, Segment, storage::Storage},
 };
-use iggy_common::INDEX_SIZE;
+use iggy_common::{INDEX_SIZE, IggyMessagesBatch};
 use ringbuffer::AllocRingBuffer;
 use std::fmt::Debug;
 
 const SEGMENTS_CAPACITY: usize = 1024;
 const ACCESS_MAP_CAPACITY: usize = 8;
 const SIZE_16MB: usize = 16 * 1024 * 1024;
+
 #[derive(Debug)]
 pub struct SegmentedLog<J>
 where
@@ -39,6 +40,7 @@ where
     segments: Vec<Segment>,
     indexes: Vec<Option<IggyIndexesMut>>,
     storage: Vec<Storage>,
+    in_flight: IggyMessagesBatchSetInFlight,
 }
 
 impl<J> Default for SegmentedLog<J>
@@ -53,6 +55,7 @@ where
             segments: Vec::with_capacity(SEGMENTS_CAPACITY),
             storage: Vec::with_capacity(SEGMENTS_CAPACITY),
             indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
+            in_flight: IggyMessagesBatchSetInFlight::default(),
         }
     }
 }
@@ -157,6 +160,22 @@ where
             *segment_indexes = Some(indexes);
         }
     }
+
+    pub fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
+        &self.in_flight
+    }
+
+    pub fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight {
+        &mut self.in_flight
+    }
+
+    pub fn set_in_flight(&mut self, batches: Vec<IggyMessagesBatch>) {
+        self.in_flight.set(batches);
+    }
+
+    pub fn clear_in_flight(&mut self) {
+        self.in_flight.clear();
+    }
 }
 
 impl<J> SegmentedLog<J>
diff --git a/core/server/src/streaming/partitions/mod.rs 
b/core/server/src/streaming/partitions/mod.rs
index 422744233..c5a2947c1 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -18,6 +18,7 @@
 
 pub mod consumer_offset;
 pub mod helpers;
+pub mod in_flight;
 pub mod journal;
 pub mod log;
 pub mod partition;
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs 
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 40c67bd3d..197bed43b 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -17,7 +17,7 @@
  */
 
 use iggy_common::PooledBuffer;
-use iggy_common::{INDEX_SIZE, IggyIndexView};
+use iggy_common::{INDEX_SIZE, IggyIndexView, IggyIndexes};
 use std::fmt;
 use std::ops::{Deref, Index as StdIndex};
 
@@ -56,6 +56,17 @@ impl IggyIndexesMut {
         (base_position, buffer)
     }
 
+    /// Freezes the indexes buffer, converting to an immutable `IggyIndexes`.
+    ///
+    /// The returned `IggyIndexes` uses Arc-backed `Bytes`, allowing cheap 
clones.
+    pub fn freeze(&mut self) -> IggyIndexes {
+        let base_position = self.base_position;
+        let buffer = self.buffer.freeze();
+        self.saved_count = 0;
+        self.base_position = 0;
+        IggyIndexes::new(buffer, base_position)
+    }
+
     /// Gets the size of all indexes messages
     pub fn messages_size(&self) -> u32 {
         self.last_position() - self.base_position
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs 
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 53a15e057..85d3bd215 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,10 +16,13 @@
  * under the License.
  */
 
-use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
+use crate::streaming::segments::{
+    IggyMessagesBatchSet,
+    messages::{write_batch, write_batch_frozen},
+};
 use compio::fs::{File, OpenOptions};
 use err_trail::ErrContext;
-use iggy_common::{IggyByteSize, IggyError};
+use iggy_common::{IggyByteSize, IggyError, IggyMessagesBatch};
 use std::{
     rc::Rc,
     sync::atomic::{AtomicU64, Ordering},
@@ -130,6 +133,37 @@ impl MessagesWriter {
         Ok(IggyByteSize::from(messages_size as u64))
     }
 
+    /// Append frozen (immutable) batches to the messages file.
+    ///
+    /// Unlike `save_batch_set`, this method does not take ownership of the 
batches.
+    /// The caller retains the batches (for use in in-flight buffer) while 
disk I/O proceeds.
+    pub async fn save_frozen_batches(
+        &self,
+        batches: &[IggyMessagesBatch],
+    ) -> Result<IggyByteSize, IggyError> {
+        let messages_size: u64 = batches.iter().map(|b| b.size() as u64).sum();
+
+        let position = self.messages_size_bytes.load(Ordering::Relaxed);
+        let file = &self.file;
+        write_batch_frozen(file, position, batches)
+            .await
+            .error(|e: &IggyError| {
+                format!(
+                    "Failed to write frozen batch to messages file: {}. {e}",
+                    self.file_path
+                )
+            })?;
+
+        if self.fsync {
+            let _ = self.fsync().await;
+        }
+
+        self.messages_size_bytes
+            .fetch_add(messages_size, Ordering::Release);
+
+        Ok(IggyByteSize::from(messages_size))
+    }
+
     pub fn path(&self) -> String {
         self.file_path.clone()
     }
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index ae3953f70..c085ed882 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -20,8 +20,9 @@ mod messages_reader;
 mod messages_writer;
 
 use super::IggyMessagesBatchSet;
+use bytes::Bytes;
 use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::IggyError;
+use iggy_common::{IggyError, IggyMessagesBatch};
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
@@ -44,3 +45,23 @@ async fn write_batch(
     result.map_err(|_| IggyError::CannotWriteToFile)?;
     Ok(total_written)
 }
+
+/// Vectored write frozen (immutable) batches to file.
+///
+/// This function writes `IggyMessagesBatch` (immutable, Arc-backed) directly
+/// without transferring ownership. The caller retains the batches for reads
+/// during the async I/O operation.
+pub async fn write_batch_frozen(
+    file: &File,
+    position: u64,
+    batches: &[IggyMessagesBatch],
+) -> Result<usize, IggyError> {
+    let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
+    let buffers: Vec<Bytes> = batches.iter().map(|b| 
b.messages_bytes()).collect();
+    let (result, _) = (&*file)
+        .write_vectored_all_at(buffers, position)
+        .await
+        .into();
+    result.map_err(|_| IggyError::CannotWriteToFile)?;
+    Ok(total_written)
+}
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs 
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 8a24fc622..061a7459a 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -24,8 +24,8 @@ use bytes::{BufMut, BytesMut};
 use iggy_common::PooledBuffer;
 use iggy_common::{
     BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, 
IggyError,
-    IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, 
IggyTimestamp,
-    MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable,
+    IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, 
IggyMessagesBatch,
+    IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, 
Validatable,
 };
 use lending_iterator::prelude::*;
 use std::ops::{Deref, Index};
@@ -265,6 +265,18 @@ impl IggyMessagesBatchMut {
         (indexes, messages)
     }
 
+    /// Freezes the batch, converting to an immutable `IggyMessagesBatch`.
+    ///
+    /// The returned batch uses Arc-backed `Bytes`, allowing cheap clones.
+    /// After calling this, the mutable batch becomes empty.
+    pub fn freeze(&mut self) -> IggyMessagesBatch {
+        let count = self.count;
+        let indexes = self.indexes.freeze();
+        let messages = self.messages.freeze();
+        self.count = 0;
+        IggyMessagesBatch::new(indexes, messages, count)
+    }
+
     pub fn take_messages(&mut self) -> PooledBuffer {
         std::mem::take(&mut self.messages)
     }
diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs 
b/core/server/src/streaming/segments/types/messages_batch_set.rs
index 42c8747c6..0856bd217 100644
--- a/core/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_set.rs
@@ -19,7 +19,10 @@
 use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
 use crate::streaming::segments::IggyIndexesMut;
 use bytes::Bytes;
-use iggy_common::{IggyByteSize, IggyMessage, IggyMessageView, PolledMessages, 
Sizeable};
+use iggy_common::{
+    IggyByteSize, IggyMessage, IggyMessageView, IggyMessagesBatch, 
PolledMessages, PooledBuffer,
+    Sizeable,
+};
 use std::ops::Index;
 use tracing::trace;
 
@@ -79,6 +82,30 @@ impl IggyMessagesBatchSet {
         self.batches.extend(other_batches);
     }
 
+    /// Add immutable batches by copying them into mutable form.
+    ///
+    /// This is used when reading from the in-flight buffer (which holds
+    /// frozen/immutable batches) and needs to convert them for the read path.
+    pub fn add_immutable_batches(&mut self, batches: &[IggyMessagesBatch]) {
+        for batch in batches {
+            let mutable_batch = Self::immutable_to_mutable(batch);
+            self.add_batch(mutable_batch);
+        }
+    }
+
+    /// Convert an immutable IggyMessagesBatch to a mutable 
IggyMessagesBatchMut.
+    ///
+    /// This requires copying the Bytes into a PooledBuffer.
+    fn immutable_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut 
{
+        let count = batch.count();
+        let base_position = batch.indexes().base_position();
+        let indexes_buffer = PooledBuffer::from(batch.indexes_slice());
+        let indexes = IggyIndexesMut::from_bytes(indexes_buffer, 
base_position);
+        let messages = PooledBuffer::from(batch.buffer());
+
+        IggyMessagesBatchMut::from_indexes_and_messages(count, indexes, 
messages)
+    }
+
     /// Extract indexes from all batches in the set
     pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) {
         for batch in self.iter() {

Reply via email to