This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new b13456c1e fix(server): chunk vectored writes to avoid exceeding
IOV_MAX limit (#2581)
b13456c1e is described below
commit b13456c1e38eb1856ccc9bc4306f205aa7672647
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 19 13:13:05 2026 +0100
fix(server): chunk vectored writes to avoid exceeding IOV_MAX limit (#2581)
When many small messages accumulate in the journal before
persistence (e.g., 1-byte payloads with 1 message per batch),
the subsequent flush attempts to pass thousands of IO vectors
to a single writev() call, exceeding the system's IOV_MAX
limit (typically 1024 on Linux) and causing CannotWriteToFile
errors.
---
core/integration/tests/server/scenarios/mod.rs | 1 +
.../scenarios/single_message_per_batch_scenario.rs | 178 +++++++++++++++++++++
core/integration/tests/server/specific.rs | 24 ++-
core/server/src/streaming/segments/messages/mod.rs | 84 +++++++---
4 files changed, 268 insertions(+), 19 deletions(-)
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index c714fb2a7..1f71098fb 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -32,6 +32,7 @@ pub mod message_headers_scenario;
pub mod message_size_scenario;
pub mod permissions_scenario;
pub mod read_during_persistence_scenario;
+pub mod single_message_per_batch_scenario;
pub mod stale_client_consumer_group_scenario;
pub mod stream_size_validation_scenario;
pub mod system_scenario;
diff --git
a/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs
b/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs
new file mode 100644
index 000000000..6c34059eb
--- /dev/null
+++
b/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs
@@ -0,0 +1,178 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "single-msg-batch-stream";
+const TOPIC_NAME: &str = "single-msg-batch-topic";
+const POLL_BATCH_SIZE: u32 = 1000;
+
+/// Test that simulates the bench scenario with single message per batch.
+///
+/// This reproduces: `iggy-bench --message-size 1 --messages-per-batch 1
--message-batches N pp -p 1 tcp`
+///
+/// The scenario exercises the server under conditions where:
+/// - Inline persistence is delayed (high MESSAGES_REQUIRED_TO_SAVE threshold)
+/// - Messages are very small (sequential number payload)
+/// - No batching (1 message per send operation)
+/// - Continuous sending for a fixed duration
+/// - Verifies all messages are received correctly after sending
+///
+/// This pattern can expose issues with:
+/// - Journal management under high message count with small payloads
+/// - Memory allocation patterns for many small messages
+/// - IOV_MAX limits when flushing many small buffers
+pub async fn run(client_factory: &dyn ClientFactory, duration_secs: u64) {
+ let client = client_factory.create_client().await;
+ let producer = IggyClient::create(client, None, None);
+ login_root(&producer).await;
+
+ producer.create_stream(STREAM_NAME).await.unwrap();
+ producer
+ .create_topic(
+ &Identifier::named(STREAM_NAME).unwrap(),
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ let stream_id = Identifier::named(STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+ let partitioning = Partitioning::partition_id(0);
+
+ let test_duration = Duration::from_secs(duration_secs);
+ let start = Instant::now();
+ let mut messages_sent = 0u64;
+
+ println!(
+ "Starting single message per batch test: sequential payloads,
duration: {}s",
+ duration_secs
+ );
+
+ while start.elapsed() < test_duration {
+ let payload = Bytes::from(messages_sent.to_le_bytes().to_vec());
+ let mut messages = vec![
+ IggyMessage::builder()
+ .payload(payload)
+ .build()
+ .expect("Failed to create message"),
+ ];
+
+ producer
+ .send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
+ .await
+ .unwrap();
+
+ messages_sent += 1;
+
+ if messages_sent.is_multiple_of(1000) {
+ println!(
+ "Progress: {} messages sent, elapsed: {:.2}s",
+ messages_sent,
+ start.elapsed().as_secs_f64()
+ );
+ }
+ }
+
+ let send_elapsed = start.elapsed();
+ println!(
+ "Sending completed: {} messages in {:.2}s ({:.0} msgs/sec)",
+ messages_sent,
+ send_elapsed.as_secs_f64(),
+ messages_sent as f64 / send_elapsed.as_secs_f64()
+ );
+
+ println!("Receiving and verifying all {} messages...", messages_sent);
+ let consumer = Consumer::default();
+ let mut next_offset = 0u64;
+ let mut messages_received = 0u64;
+
+ while messages_received < messages_sent {
+ let remaining = messages_sent - messages_received;
+ let batch_size = remaining.min(POLL_BATCH_SIZE as u64) as u32;
+
+ let polled = producer
+ .poll_messages(
+ &stream_id,
+ &topic_id,
+ Some(0),
+ &consumer,
+ &PollingStrategy::offset(next_offset),
+ batch_size,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert!(
+ !polled.messages.is_empty(),
+ "Expected messages at offset {}, but got none (sent: {}, received
so far: {})",
+ next_offset,
+ messages_sent,
+ messages_received
+ );
+
+ for msg in &polled.messages {
+ let payload = &msg.payload;
+ assert_eq!(
+ payload.len(),
+ 8,
+ "Message {} has unexpected payload length: {}",
+ messages_received,
+ payload.len()
+ );
+
+ let expected_value = messages_received;
+ let actual_value =
u64::from_le_bytes(payload[..8].try_into().unwrap());
+ assert_eq!(
+ actual_value, expected_value,
+ "Message at offset {} has wrong payload: expected {}, got {}",
+ next_offset, expected_value, actual_value
+ );
+
+ messages_received += 1;
+ next_offset += 1;
+ }
+
+ if messages_received.is_multiple_of(10000) {
+ println!(
+ "Verified: {}/{} messages ({:.1}%)",
+ messages_received,
+ messages_sent,
+ (messages_received as f64 / messages_sent as f64) * 100.0
+ );
+ }
+ }
+
+ let total_elapsed = start.elapsed();
+ println!(
+ "Test completed: {} messages sent and verified in {:.2}s",
+ messages_sent,
+ total_elapsed.as_secs_f64()
+ );
+
+ producer.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index 205d6b5df..7df3d75d9 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,7 +17,8 @@
*/
use crate::server::scenarios::{
- delete_segments_scenario, message_size_scenario, tcp_tls_scenario,
websocket_tls_scenario,
+ delete_segments_scenario, message_size_scenario,
single_message_per_batch_scenario,
+ tcp_tls_scenario, websocket_tls_scenario,
};
use iggy::prelude::*;
use integration::{
@@ -190,3 +191,24 @@ async fn message_size_scenario() {
message_size_scenario::run(&client_factory).await;
}
+
+#[tokio::test]
+#[parallel]
+async fn should_handle_single_message_per_batch_with_delayed_persistence() {
+ let mut extra_envs = HashMap::new();
+ extra_envs.insert(
+ "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+ "10000".to_string(),
+ );
+
+ let mut test_server = TestServer::new(Some(extra_envs), true, None,
IpAddrKind::V4);
+ test_server.start();
+
+ let server_addr = test_server.get_raw_tcp_addr().unwrap();
+ let client_factory = TcpClientFactory {
+ server_addr,
+ ..Default::default()
+ };
+
+ single_message_per_batch_scenario::run(&client_factory, 5).await;
+}
diff --git a/core/server/src/streaming/segments/messages/mod.rs
b/core/server/src/streaming/segments/messages/mod.rs
index c085ed882..8f5ba9c81 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -22,27 +22,32 @@ mod messages_writer;
use super::IggyMessagesBatchSet;
use bytes::Bytes;
use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::{IggyError, IggyMessagesBatch};
+use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer};
pub use messages_reader::MessagesReader;
pub use messages_writer::MessagesWriter;
+/// Maximum number of IO vectors for a single writev() call.
+/// Linux typically has IOV_MAX=1024, but we use a conservative value to ensure
+/// cross-platform compatibility and leave room for any internal overhead.
+const MAX_IOV_COUNT: usize = 1024;
+
/// Vectored write a batches of messages to file
async fn write_batch(
file: &File,
position: u64,
mut batches: IggyMessagesBatchSet,
) -> Result<usize, IggyError> {
- let total_written = batches.iter().map(|b| b.size() as usize).sum();
- let batches = batches
- .iter_mut()
- .map(|b| b.take_messages())
- .collect::<Vec<_>>();
- let (result, _) = (&*file)
- .write_vectored_all_at(batches, position)
- .await
- .into();
- result.map_err(|_| IggyError::CannotWriteToFile)?;
+ let (total_written, buffers) =
+ batches
+ .iter_mut()
+ .fold((0usize, Vec::new()), |(size, mut bufs), batch| {
+ let batch_size = batch.size() as usize;
+ bufs.push(batch.take_messages());
+ (size + batch_size, bufs)
+ });
+
+ write_vectored_chunked_pooled(file, position, buffers).await?;
Ok(total_written)
}
@@ -56,12 +61,55 @@ pub async fn write_batch_frozen(
position: u64,
batches: &[IggyMessagesBatch],
) -> Result<usize, IggyError> {
- let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
- let buffers: Vec<Bytes> = batches.iter().map(|b|
b.messages_bytes()).collect();
- let (result, _) = (&*file)
- .write_vectored_all_at(buffers, position)
- .await
- .into();
- result.map_err(|_| IggyError::CannotWriteToFile)?;
+ let (total_written, buffers) = batches.iter().fold(
+ (0usize, Vec::with_capacity(batches.len())),
+ |(size, mut bufs), batch| {
+ bufs.push(batch.messages_bytes());
+ (size + batch.size() as usize, bufs)
+ },
+ );
+
+ write_vectored_chunked_bytes(file, position, buffers).await?;
Ok(total_written)
}
+
+/// Writes PooledBuffer buffers to file using vectored I/O, chunking to
respect IOV_MAX limits.
+async fn write_vectored_chunked_pooled(
+ file: &File,
+ mut position: u64,
+ buffers: Vec<PooledBuffer>,
+) -> Result<(), IggyError> {
+ let mut iter = buffers.into_iter().peekable();
+
+ while iter.peek().is_some() {
+ let chunk: Vec<PooledBuffer> =
iter.by_ref().take(MAX_IOV_COUNT).collect();
+ let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
+
+ let (result, _) = (&*file).write_vectored_all_at(chunk,
position).await.into();
+ result.map_err(|_| IggyError::CannotWriteToFile)?;
+
+ position += chunk_size as u64;
+ }
+ Ok(())
+}
+
+/// Writes Bytes buffers to file using vectored I/O, chunking to respect
IOV_MAX limits.
+async fn write_vectored_chunked_bytes(
+ file: &File,
+ mut position: u64,
+ buffers: Vec<Bytes>,
+) -> Result<(), IggyError> {
+ for chunk in buffers.chunks(MAX_IOV_COUNT) {
+ let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
+ let chunk_vec: Vec<Bytes> = chunk.to_vec();
+
+ let (result, _) = (&*file)
+ .write_vectored_all_at(chunk_vec, position)
+ .await
+ .into();
+ result.map_err(|_| IggyError::CannotWriteToFile)?;
+
+ position += chunk_size as u64;
+ }
+ Ok(())
+}