This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a53a4ed3d fix(server): maintain message availability during async disk
I/O (#2575)
a53a4ed3d is described below
commit a53a4ed3d0f344327e1d3a85a86bbff3aca783f9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Jan 16 12:54:45 2026 +0100
fix(server): maintain message availability during async disk I/O (#2575)
Messages were temporarily unreadable after journal commit but
before disk write completed. Freeze batches to Arc-backed
Bytes so the same data can be held in an in-flight buffer for
reads while compio writes.
---
core/common/src/alloc/buffer.rs | 23 ++-
core/common/src/types/message/messages_batch.rs | 10 ++
core/common/src/types/message/polled_messages.rs | 8 +-
core/integration/tests/server/scenarios/mod.rs | 1 +
.../scenarios/read_during_persistence_scenario.rs | 195 +++++++++++++++++++++
.../handlers/messages/poll_messages_handler.rs | 6 +-
core/server/src/slab/streams.rs | 72 ++++++--
core/server/src/streaming/partitions/helpers.rs | 13 +-
core/server/src/streaming/partitions/in_flight.rs | 77 ++++++++
core/server/src/streaming/partitions/log.rs | 23 ++-
core/server/src/streaming/partitions/mod.rs | 1 +
.../src/streaming/segments/indexes/indexes_mut.rs | 13 +-
.../streaming/segments/messages/messages_writer.rs | 38 +++-
core/server/src/streaming/segments/messages/mod.rs | 23 ++-
.../streaming/segments/types/messages_batch_mut.rs | 16 +-
.../streaming/segments/types/messages_batch_set.rs | 29 ++-
16 files changed, 508 insertions(+), 40 deletions(-)
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 2d5a65c75..601fe6a86 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -17,7 +17,7 @@
*/
use super::memory_pool::{BytesMutExt, memory_pool};
-use bytes::{Buf, BufMut, BytesMut};
+use bytes::{Buf, BufMut, Bytes, BytesMut};
use compio::buf::{IoBuf, IoBufMut, SetBufInit};
use std::ops::{Deref, DerefMut};
@@ -197,6 +197,27 @@ impl PooledBuffer {
let mut this = std::mem::ManuallyDrop::new(self);
std::mem::take(&mut this.inner)
}
+
+ /// Freezes the buffer, converting it to an immutable `Bytes`.
+ ///
+ /// After calling this method, the PooledBuffer becomes empty and will not
+ /// return memory to the pool on drop (the frozen Bytes owns the
allocation).
+ /// The returned `Bytes` is Arc-backed, allowing cheap clones.
+ pub fn freeze(&mut self) -> Bytes {
+ // Decrement pool counter since memory is transferred to Bytes
+ // and won't be returned to the pool.
+ if self.from_pool
+ && let Some(bucket_idx) = self.original_bucket_idx
+ {
+ memory_pool().dec_bucket_in_use(bucket_idx);
+ }
+
+ let inner = std::mem::take(&mut self.inner);
+ self.from_pool = false;
+ self.original_capacity = 0;
+ self.original_bucket_idx = None;
+ inner.freeze()
+ }
}
impl Deref for PooledBuffer {
diff --git a/core/common/src/types/message/messages_batch.rs
b/core/common/src/types/message/messages_batch.rs
index 3d03159e5..6bc704866 100644
--- a/core/common/src/types/message/messages_batch.rs
+++ b/core/common/src/types/message/messages_batch.rs
@@ -75,11 +75,21 @@ impl IggyMessagesBatch {
&self.messages
}
+ /// Get the messages as a cloned `Bytes` (cheap Arc increment).
+ pub fn messages_bytes(&self) -> Bytes {
+ self.messages.clone()
+ }
+
/// Get the indexes slice
pub fn indexes_slice(&self) -> &[u8] {
&self.indexes
}
+ /// Get a reference to the indexes
+ pub fn indexes(&self) -> &IggyIndexes {
+ &self.indexes
+ }
+
/// Take the indexes from the batch
pub fn take_indexes(&mut self) -> IggyIndexes {
std::mem::take(&mut self.indexes)
diff --git a/core/common/src/types/message/polled_messages.rs
b/core/common/src/types/message/polled_messages.rs
index 9e091437b..8204ffde2 100644
--- a/core/common/src/types/message/polled_messages.rs
+++ b/core/common/src/types/message/polled_messages.rs
@@ -111,12 +111,16 @@ fn messages_from_bytes_and_count(buffer: Bytes, count:
u32) -> Result<Vec<IggyMe
let payload = buffer.slice(position..payload_end);
position = payload_end;
+ let user_headers_end = position + header.user_headers_length as usize;
+ if user_headers_end > buf_len {
+ break;
+ }
let user_headers = if header.user_headers_length > 0 {
- Some(buffer.slice(position..position + header.user_headers_length
as usize))
+ Some(buffer.slice(position..user_headers_end))
} else {
None
};
- position += header.user_headers_length as usize;
+ position = user_headers_end;
messages.push(IggyMessage {
header,
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index 1676cefa3..2c2fbab3a 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -30,6 +30,7 @@ pub mod delete_segments_scenario;
pub mod encryption_scenario;
pub mod message_headers_scenario;
pub mod message_size_scenario;
+pub mod read_during_persistence_scenario;
pub mod stale_client_consumer_group_scenario;
pub mod stream_size_validation_scenario;
pub mod system_scenario;
diff --git
a/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
new file mode 100644
index 000000000..0ff9c8bd3
--- /dev/null
+++
b/core/integration/tests/server/scenarios/read_during_persistence_scenario.rs
@@ -0,0 +1,195 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::{
+ tcp_client::TcpClientFactory,
+ test_server::{ClientFactory, IpAddrKind, SYSTEM_PATH_ENV_VAR, TestServer,
login_root},
+};
+use serial_test::parallel;
+use std::collections::HashMap;
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "eventual-consistency-stream";
+const TOPIC_NAME: &str = "eventual-consistency-topic";
+const TEST_DURATION_SECS: u64 = 10;
+
+/// Test with two separate clients - one for sending, one for polling.
+///
+/// This should expose the race condition where messages are in-flight during
+/// disk write and unavailable for polling.
+#[tokio::test]
+#[parallel]
+async fn
should_read_messages_immediately_after_send_with_aggressive_persistence() {
+ let env_vars = HashMap::from([
+ (
+ SYSTEM_PATH_ENV_VAR.to_owned(),
+ TestServer::get_random_path(),
+ ),
+ (
+ "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
+ "32".to_owned(),
+ ),
+ (
+
"IGGY_SYSTEM_PARTITION_SIZE_OF_MESSAGES_REQUIRED_TO_SAVE".to_owned(),
+ "512B".to_owned(),
+ ),
+ (
+ "IGGY_SYSTEM_PARTITION_ENFORCE_FSYNC".to_owned(),
+ "false".to_owned(),
+ ),
+ ]);
+
+ let mut test_server = TestServer::new(Some(env_vars), true, None,
IpAddrKind::V4);
+ test_server.start();
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+
+ let producer_client = TcpClientFactory {
+ server_addr: server_addr.clone(),
+ ..Default::default()
+ }
+ .create_client()
+ .await;
+ let producer = IggyClient::create(producer_client, None, None);
+ login_root(&producer).await;
+
+ producer.create_stream(STREAM_NAME).await.unwrap();
+ producer
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ let consumer_client = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ }
+ .create_client()
+ .await;
+ let consumer = IggyClient::create(consumer_client, None, None);
+ login_root(&consumer).await;
+
+ let stream_id = Identifier::named(STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+ let consumer_kind = Consumer::default();
+
+ let test_duration = Duration::from_secs(TEST_DURATION_SECS);
+ let messages_per_batch = 32u32;
+ let payload = "X".repeat(1024);
+
+ let start = Instant::now();
+ let mut batches_sent = 0u64;
+ let mut messages_sent = 0u64;
+
+ println!(
+ "Starting test: 1KB messages, {} msgs/batch, duration: {}s",
+ messages_per_batch, TEST_DURATION_SECS
+ );
+
+ while start.elapsed() < test_duration {
+ let base_offset = batches_sent * messages_per_batch as u64;
+
+ let mut messages: Vec<IggyMessage> = (0..messages_per_batch)
+ .map(|i| {
+ IggyMessage::builder()
+ .id((base_offset + i as u64 + 1) as u128)
+ .payload(Bytes::from(format!(
+ "{} - batch {} msg {}",
+ payload, batches_sent, i
+ )))
+ .build()
+ .expect("Failed to create message")
+ })
+ .collect();
+
+ println!("Sending batch {}", batches_sent);
+ let send_result = producer
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await;
+ match &send_result {
+ Ok(_) => println!("Batch {} sent successfully", batches_sent),
+ Err(e) => println!("Batch {} send error: {:?}", batches_sent, e),
+ }
+ send_result.unwrap();
+
+ batches_sent += 1;
+ messages_sent += messages_per_batch as u64;
+
+ println!("Calling poll_messages after batch {}", batches_sent);
+ let poll_result = consumer
+ .poll_messages(
+ &stream_id,
+ &topic_id,
+ Some(0),
+ &consumer_kind,
+ &PollingStrategy::offset(0),
+ messages_sent as u32,
+ false,
+ )
+ .await;
+
+ let polled_count = match &poll_result {
+ Ok(polled) => polled.messages.len() as u64,
+ Err(e) => {
+ println!("Poll error: {:?}", e);
+ 0
+ }
+ };
+
+ if polled_count < messages_sent {
+ let missing = messages_sent - polled_count;
+ let elapsed_ms = start.elapsed().as_millis();
+
+ panic!(
+ "RACE CONDITION DETECTED after {:.2}s/{}s ({} batches, {}
messages), expected {} messages, got {}. Missing: {}",
+ elapsed_ms as f64 / 1000.0,
+ TEST_DURATION_SECS,
+ batches_sent,
+ messages_sent,
+ messages_sent,
+ polled_count,
+ missing
+ );
+ }
+
+ if batches_sent.is_multiple_of(1000) {
+ println!(
+ "Progress: {} batches, {} messages, elapsed: {:.2}s/{}s",
+ batches_sent,
+ messages_sent,
+ start.elapsed().as_secs_f64(),
+ TEST_DURATION_SECS
+ );
+ }
+ }
+
+ producer.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
index ea8ccbb8c..354b554b0 100644
--- a/core/server/src/binary/handlers/messages/poll_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/poll_messages_handler.rs
@@ -92,7 +92,7 @@ impl ServerCommandHandler for PollMessages {
let response_length = 4 + 8 + 4 + batch.size();
let response_length_bytes = response_length.to_le_bytes();
- let mut bufs = Vec::with_capacity(batch.containers_count() + 5);
+ let mut bufs = Vec::with_capacity(batch.containers_count() + 3);
let mut partition_id_buf = PooledBuffer::with_capacity(4);
let mut current_offset_buf = PooledBuffer::with_capacity(8);
let mut count_buf = PooledBuffer::with_capacity(4);
@@ -104,7 +104,9 @@ impl ServerCommandHandler for PollMessages {
bufs.push(current_offset_buf);
bufs.push(count_buf);
- batch.iter_mut().for_each(|m| bufs.push(m.take_messages()));
+ batch.iter_mut().for_each(|m| {
+ bufs.push(m.take_messages());
+ });
trace!(
"Sending {} messages to client ({} bytes) to client",
batch.count(),
diff --git a/core/server/src/slab/streams.rs b/core/server/src/slab/streams.rs
index 66e5c7cbb..f1c342de2 100644
--- a/core/server/src/slab/streams.rs
+++ b/core/server/src/slab/streams.rs
@@ -57,7 +57,7 @@ use crate::{
};
use ahash::AHashMap;
use err_trail::ErrContext;
-use iggy_common::{Identifier, IggyError, IggyTimestamp, PollingKind};
+use iggy_common::{Identifier, IggyError, IggyMessagesBatch, IggyTimestamp,
PollingKind};
use slab::Slab;
use std::{
cell::RefCell,
@@ -737,23 +737,48 @@ impl Streams {
count: u32,
segment_start_offset: u64,
) -> Result<IggyMessagesBatchSet, IggyError> {
- let (is_journal_empty, journal_first_offset, journal_last_offset) =
self
- .with_partition_by_id(
- stream_id,
- topic_id,
- partition_id,
- |(_, _, _, _, _, _, log)| {
- let journal = log.journal();
- (
- journal.is_empty(),
- journal.inner().base_offset,
- journal.inner().current_offset,
- )
- },
- );
+ let (
+ is_journal_empty,
+ journal_first_offset,
+ journal_last_offset,
+ in_flight_empty,
+ in_flight_first,
+ in_flight_last,
+ ) = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)| {
+ let journal = log.journal();
+ let in_flight = log.in_flight();
+ (
+ journal.is_empty(),
+ journal.inner().base_offset,
+ journal.inner().current_offset,
+ in_flight.is_empty(),
+ in_flight.first_offset(),
+ in_flight.last_offset(),
+ )
+ },
+ );
- // Case 0: Accumulator is empty, so all messages have to be on disk
+ // Case 0: Journal is empty, check in-flight buffer or disk
if is_journal_empty {
+ if !in_flight_empty && offset >= in_flight_first && offset <=
in_flight_last {
+ let mut result = IggyMessagesBatchSet::empty();
+ let in_flight_batches = self.with_partition_by_id(
+ stream_id,
+ topic_id,
+ partition_id,
+ |(_, _, _, _, _, _, log)|
log.in_flight().get_by_offset(offset, count).to_vec(),
+ );
+ if !in_flight_batches.is_empty() {
+ result.add_immutable_batches(&in_flight_batches);
+ let final_result = result.get_by_offset(offset, count);
+ return Ok(final_result);
+ }
+ }
+
return self
.load_messages_from_disk_by_offset(
stream_id,
@@ -1297,7 +1322,7 @@ impl Streams {
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: usize,
- batches: IggyMessagesBatchSet,
+ mut batches: IggyMessagesBatchSet,
config: &SystemConfig,
) -> Result<u32, IggyError> {
let batch_count = batches.count();
@@ -1316,7 +1341,12 @@ impl Streams {
return Ok(0);
}
- // Extract storage before async operations
+ // Store frozen batches in in-flight buffer for reads during async I/O
+ let frozen: Vec<IggyMessagesBatch> = batches.iter_mut().map(|b|
b.freeze()).collect();
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.set_in_flight(frozen.clone());
+ });
+
let (messages_writer, index_writer) =
self.with_partition_by_id(stream_id, topic_id, partition_id, |(..,
log)| {
(
@@ -1336,7 +1366,7 @@ impl Streams {
let saved = messages_writer
.as_ref()
- .save_batch_set(batches)
+ .save_frozen_batches(&frozen)
.await
.error(|e: &IggyError| {
format!(
@@ -1376,6 +1406,10 @@ impl Streams {
streaming_partitions::helpers::update_index_and_increment_stats(saved, config),
);
+ self.with_partition_by_id_mut(stream_id, topic_id, partition_id, |(..,
log)| {
+ log.clear_in_flight();
+ });
+
drop(guard);
Ok(batch_count)
}
diff --git a/core/server/src/streaming/partitions/helpers.rs
b/core/server/src/streaming/partitions/helpers.rs
index 6d14f8971..531e77e54 100644
--- a/core/server/src/streaming/partitions/helpers.rs
+++ b/core/server/src/streaming/partitions/helpers.rs
@@ -15,13 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use err_trail::ErrContext;
-use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
-use std::{
- ops::AsyncFnOnce,
- sync::{Arc, atomic::Ordering},
-};
-
use crate::{
configs::{cache_indexes::CacheIndexesConfig, system::SystemConfig},
slab::{
@@ -42,6 +35,12 @@ use crate::{
segments::{IggyIndexesMut, IggyMessagesBatchMut, IggyMessagesBatchSet,
storage::Storage},
},
};
+use err_trail::ErrContext;
+use iggy_common::{ConsumerOffsetInfo, Identifier, IggyByteSize, IggyError};
+use std::{
+ ops::AsyncFnOnce,
+ sync::{Arc, atomic::Ordering},
+};
pub fn get_partition_ids() -> impl FnOnce(&Partitions) -> Vec<usize> {
|partitions| {
diff --git a/core/server/src/streaming/partitions/in_flight.rs
b/core/server/src/streaming/partitions/in_flight.rs
new file mode 100644
index 000000000..af90d3e7d
--- /dev/null
+++ b/core/server/src/streaming/partitions/in_flight.rs
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use iggy_common::IggyMessagesBatch;
+
+/// Holds batches that are being written to disk.
+///
+/// During async I/O, messages are transferred from the journal to disk.
+/// This buffer holds frozen (immutable, Arc-backed) copies so consumers
+/// can still read them during the write operation.
+#[derive(Debug, Default)]
+pub struct IggyMessagesBatchSetInFlight {
+ batches: Vec<IggyMessagesBatch>,
+ first_offset: u64,
+ last_offset: u64,
+}
+
+impl IggyMessagesBatchSetInFlight {
+ pub fn is_empty(&self) -> bool {
+ self.batches.is_empty()
+ }
+
+ pub fn first_offset(&self) -> u64 {
+ self.first_offset
+ }
+
+ pub fn last_offset(&self) -> u64 {
+ self.last_offset
+ }
+
+ pub fn set(&mut self, batches: Vec<IggyMessagesBatch>) {
+ if batches.is_empty() {
+ self.clear();
+ return;
+ }
+ self.first_offset = batches.first().and_then(|b|
b.first_offset()).unwrap_or(0);
+ self.last_offset = batches.last().and_then(|b|
b.last_offset()).unwrap_or(0);
+ self.batches = batches;
+ }
+
+ pub fn clear(&mut self) {
+ self.batches.clear();
+ self.first_offset = 0;
+ self.last_offset = 0;
+ }
+
+ pub fn get_by_offset(&self, start_offset: u64, count: u32) ->
&[IggyMessagesBatch] {
+ if self.is_empty() || start_offset > self.last_offset {
+ return &[];
+ }
+
+ let end_offset = start_offset + count as u64 - 1;
+ if end_offset < self.first_offset {
+ return &[];
+ }
+
+ &self.batches
+ }
+
+ pub fn batches(&self) -> &[IggyMessagesBatch] {
+ &self.batches
+ }
+}
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index e94adbc80..ae7c1efaa 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -16,16 +16,17 @@
// under the License.
use crate::streaming::{
- partitions::journal::Journal,
+ partitions::{in_flight::IggyMessagesBatchSetInFlight, journal::Journal},
segments::{IggyIndexesMut, Segment, storage::Storage},
};
-use iggy_common::INDEX_SIZE;
+use iggy_common::{INDEX_SIZE, IggyMessagesBatch};
use ringbuffer::AllocRingBuffer;
use std::fmt::Debug;
const SEGMENTS_CAPACITY: usize = 1024;
const ACCESS_MAP_CAPACITY: usize = 8;
const SIZE_16MB: usize = 16 * 1024 * 1024;
+
#[derive(Debug)]
pub struct SegmentedLog<J>
where
@@ -39,6 +40,7 @@ where
segments: Vec<Segment>,
indexes: Vec<Option<IggyIndexesMut>>,
storage: Vec<Storage>,
+ in_flight: IggyMessagesBatchSetInFlight,
}
impl<J> Default for SegmentedLog<J>
@@ -53,6 +55,7 @@ where
segments: Vec::with_capacity(SEGMENTS_CAPACITY),
storage: Vec::with_capacity(SEGMENTS_CAPACITY),
indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
+ in_flight: IggyMessagesBatchSetInFlight::default(),
}
}
}
@@ -157,6 +160,22 @@ where
*segment_indexes = Some(indexes);
}
}
+
+ pub fn in_flight(&self) -> &IggyMessagesBatchSetInFlight {
+ &self.in_flight
+ }
+
+ pub fn in_flight_mut(&mut self) -> &mut IggyMessagesBatchSetInFlight {
+ &mut self.in_flight
+ }
+
+ pub fn set_in_flight(&mut self, batches: Vec<IggyMessagesBatch>) {
+ self.in_flight.set(batches);
+ }
+
+ pub fn clear_in_flight(&mut self) {
+ self.in_flight.clear();
+ }
}
impl<J> SegmentedLog<J>
diff --git a/core/server/src/streaming/partitions/mod.rs
b/core/server/src/streaming/partitions/mod.rs
index 422744233..c5a2947c1 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -18,6 +18,7 @@
pub mod consumer_offset;
pub mod helpers;
+pub mod in_flight;
pub mod journal;
pub mod log;
pub mod partition;
diff --git a/core/server/src/streaming/segments/indexes/indexes_mut.rs
b/core/server/src/streaming/segments/indexes/indexes_mut.rs
index 40c67bd3d..197bed43b 100644
--- a/core/server/src/streaming/segments/indexes/indexes_mut.rs
+++ b/core/server/src/streaming/segments/indexes/indexes_mut.rs
@@ -17,7 +17,7 @@
*/
use iggy_common::PooledBuffer;
-use iggy_common::{INDEX_SIZE, IggyIndexView};
+use iggy_common::{INDEX_SIZE, IggyIndexView, IggyIndexes};
use std::fmt;
use std::ops::{Deref, Index as StdIndex};
@@ -56,6 +56,17 @@ impl IggyIndexesMut {
(base_position, buffer)
}
+ /// Freezes the indexes buffer, converting to an immutable `IggyIndexes`.
+ ///
+ /// The returned `IggyIndexes` uses Arc-backed `Bytes`, allowing cheap
clones.
+ pub fn freeze(&mut self) -> IggyIndexes {
+ let base_position = self.base_position;
+ let buffer = self.buffer.freeze();
+ self.saved_count = 0;
+ self.base_position = 0;
+ IggyIndexes::new(buffer, base_position)
+ }
+
/// Gets the size of all indexes messages
pub fn messages_size(&self) -> u32 {
self.last_position() - self.base_position
diff --git a/core/server/src/streaming/segments/messages/messages_writer.rs
b/core/server/src/streaming/segments/messages/messages_writer.rs
index 53a15e057..85d3bd215 100644
--- a/core/server/src/streaming/segments/messages/messages_writer.rs
+++ b/core/server/src/streaming/segments/messages/messages_writer.rs
@@ -16,10 +16,13 @@
* under the License.
*/
-use crate::streaming::segments::{IggyMessagesBatchSet, messages::write_batch};
+use crate::streaming::segments::{
+ IggyMessagesBatchSet,
+ messages::{write_batch, write_batch_frozen},
+};
use compio::fs::{File, OpenOptions};
use err_trail::ErrContext;
-use iggy_common::{IggyByteSize, IggyError};
+use iggy_common::{IggyByteSize, IggyError, IggyMessagesBatch};
use std::{
rc::Rc,
sync::atomic::{AtomicU64, Ordering},
@@ -130,6 +133,37 @@ impl MessagesWriter {
Ok(IggyByteSize::from(messages_size as u64))
}
+ /// Append frozen (immutable) batches to the messages file.
+ ///
+ /// Unlike `save_batch_set`, this method does not take ownership of the
batches.
+ /// The caller retains the batches (for use in in-flight buffer) while
disk I/O proceeds.
+ pub async fn save_frozen_batches(
+ &self,
+ batches: &[IggyMessagesBatch],
+ ) -> Result<IggyByteSize, IggyError> {
+ let messages_size: u64 = batches.iter().map(|b| b.size() as u64).sum();
+
+ let position = self.messages_size_bytes.load(Ordering::Relaxed);
+ let file = &self.file;
+ write_batch_frozen(file, position, batches)
+ .await
+ .error(|e: &IggyError| {
+ format!(
+ "Failed to write frozen batch to messages file: {}. {e}",
+ self.file_path
+ )
+ })?;
+
+ if self.fsync {
+ let _ = self.fsync().await;
+ }
+
+ self.messages_size_bytes
+ .fetch_add(messages_size, Ordering::Release);
+
+ Ok(IggyByteSize::from(messages_size))
+ }
+
pub fn path(&self) -> String {
self.file_path.clone()
}
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index ae3953f70..c085ed882 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -20,8 +20,9 @@ mod messages_reader;
mod messages_writer;
use super::IggyMessagesBatchSet;
+use bytes::Bytes;
use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::IggyError;
+use iggy_common::{IggyError, IggyMessagesBatch};
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
@@ -44,3 +45,23 @@ async fn write_batch(
result.map_err(|_| IggyError::CannotWriteToFile)?;
Ok(total_written)
}
+
+/// Vectored write frozen (immutable) batches to file.
+///
+/// This function writes `IggyMessagesBatch` (immutable, Arc-backed) directly
+/// without transferring ownership. The caller retains the batches for reads
+/// during the async I/O operation.
+pub async fn write_batch_frozen(
+ file: &File,
+ position: u64,
+ batches: &[IggyMessagesBatch],
+) -> Result<usize, IggyError> {
+ let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
+ let buffers: Vec<Bytes> = batches.iter().map(|b|
b.messages_bytes()).collect();
+ let (result, _) = (&*file)
+ .write_vectored_all_at(buffers, position)
+ .await
+ .into();
+ result.map_err(|_| IggyError::CannotWriteToFile)?;
+ Ok(total_written)
+}
diff --git a/core/server/src/streaming/segments/types/messages_batch_mut.rs
b/core/server/src/streaming/segments/types/messages_batch_mut.rs
index 8a24fc622..061a7459a 100644
--- a/core/server/src/streaming/segments/types/messages_batch_mut.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_mut.rs
@@ -24,8 +24,8 @@ use bytes::{BufMut, BytesMut};
use iggy_common::PooledBuffer;
use iggy_common::{
BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize,
IggyError,
- IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator,
IggyTimestamp,
- MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable,
+ IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator,
IggyMessagesBatch,
+ IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable,
Validatable,
};
use lending_iterator::prelude::*;
use std::ops::{Deref, Index};
@@ -265,6 +265,18 @@ impl IggyMessagesBatchMut {
(indexes, messages)
}
+ /// Freezes the batch, converting to an immutable `IggyMessagesBatch`.
+ ///
+ /// The returned batch uses Arc-backed `Bytes`, allowing cheap clones.
+ /// After calling this, the mutable batch becomes empty.
+ pub fn freeze(&mut self) -> IggyMessagesBatch {
+ let count = self.count;
+ let indexes = self.indexes.freeze();
+ let messages = self.messages.freeze();
+ self.count = 0;
+ IggyMessagesBatch::new(indexes, messages, count)
+ }
+
pub fn take_messages(&mut self) -> PooledBuffer {
std::mem::take(&mut self.messages)
}
diff --git a/core/server/src/streaming/segments/types/messages_batch_set.rs
b/core/server/src/streaming/segments/types/messages_batch_set.rs
index 42c8747c6..0856bd217 100644
--- a/core/server/src/streaming/segments/types/messages_batch_set.rs
+++ b/core/server/src/streaming/segments/types/messages_batch_set.rs
@@ -19,7 +19,10 @@
use crate::binary::handlers::messages::poll_messages_handler::IggyPollMetadata;
use crate::streaming::segments::IggyIndexesMut;
use bytes::Bytes;
-use iggy_common::{IggyByteSize, IggyMessage, IggyMessageView, PolledMessages,
Sizeable};
+use iggy_common::{
+ IggyByteSize, IggyMessage, IggyMessageView, IggyMessagesBatch,
PolledMessages, PooledBuffer,
+ Sizeable,
+};
use std::ops::Index;
use tracing::trace;
@@ -79,6 +82,30 @@ impl IggyMessagesBatchSet {
self.batches.extend(other_batches);
}
+ /// Add immutable batches by copying them into mutable form.
+ ///
+ /// This is used when reading from the in-flight buffer (which holds
+ /// frozen/immutable batches) and needs to convert them for the read path.
+ pub fn add_immutable_batches(&mut self, batches: &[IggyMessagesBatch]) {
+ for batch in batches {
+ let mutable_batch = Self::immutable_to_mutable(batch);
+ self.add_batch(mutable_batch);
+ }
+ }
+
+ /// Convert an immutable IggyMessagesBatch to a mutable
IggyMessagesBatchMut.
+ ///
+ /// This requires copying the Bytes into a PooledBuffer.
+ fn immutable_to_mutable(batch: &IggyMessagesBatch) -> IggyMessagesBatchMut
{
+ let count = batch.count();
+ let base_position = batch.indexes().base_position();
+ let indexes_buffer = PooledBuffer::from(batch.indexes_slice());
+ let indexes = IggyIndexesMut::from_bytes(indexes_buffer,
base_position);
+ let messages = PooledBuffer::from(batch.buffer());
+
+ IggyMessagesBatchMut::from_indexes_and_messages(count, indexes,
messages)
+ }
+
/// Extract indexes from all batches in the set
pub fn append_indexes_to(&self, target: &mut IggyIndexesMut) {
for batch in self.iter() {