This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new b13456c1e fix(server): chunk vectored writes to avoid exceeding 
IOV_MAX limit (#2581)
b13456c1e is described below

commit b13456c1e38eb1856ccc9bc4306f205aa7672647
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Jan 19 13:13:05 2026 +0100

    fix(server): chunk vectored writes to avoid exceeding IOV_MAX limit (#2581)
    
    When many small messages accumulate in the journal before
    persistence (e.g., 1-byte payloads with 1 message per batch),
    the subsequent flush attempts to pass thousands of IO vectors
    to a single writev() call, exceeding the system's IOV_MAX
    limit (typically 1024 on Linux) and causing CannotWriteToFile
    errors.
---
 core/integration/tests/server/scenarios/mod.rs     |   1 +
 .../scenarios/single_message_per_batch_scenario.rs | 178 +++++++++++++++++++++
 core/integration/tests/server/specific.rs          |  24 ++-
 core/server/src/streaming/segments/messages/mod.rs |  84 +++++++---
 4 files changed, 268 insertions(+), 19 deletions(-)

diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index c714fb2a7..1f71098fb 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -32,6 +32,7 @@ pub mod message_headers_scenario;
 pub mod message_size_scenario;
 pub mod permissions_scenario;
 pub mod read_during_persistence_scenario;
+pub mod single_message_per_batch_scenario;
 pub mod stale_client_consumer_group_scenario;
 pub mod stream_size_validation_scenario;
 pub mod system_scenario;
diff --git 
a/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs 
b/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs
new file mode 100644
index 000000000..6c34059eb
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/single_message_per_batch_scenario.rs
@@ -0,0 +1,178 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::test_server::{ClientFactory, login_root};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "single-msg-batch-stream";
+const TOPIC_NAME: &str = "single-msg-batch-topic";
+const POLL_BATCH_SIZE: u32 = 1000;
+
+/// Test that simulates the bench scenario with single message per batch.
+///
+/// This reproduces: `iggy-bench --message-size 1 --messages-per-batch 1 
--message-batches N pp -p 1 tcp`
+///
+/// The scenario exercises the server under conditions where:
+/// - Inline persistence is delayed (high MESSAGES_REQUIRED_TO_SAVE threshold)
+/// - Messages are very small (sequential number payload)
+/// - No batching (1 message per send operation)
+/// - Continuous sending for a fixed duration
+/// - Verifies all messages are received correctly after sending
+///
+/// This pattern can expose issues with:
+/// - Journal management under high message count with small payloads
+/// - Memory allocation patterns for many small messages
+/// - IOV_MAX limits when flushing many small buffers
+pub async fn run(client_factory: &dyn ClientFactory, duration_secs: u64) {
+    let client = client_factory.create_client().await;
+    let producer = IggyClient::create(client, None, None);
+    login_root(&producer).await;
+
+    producer.create_stream(STREAM_NAME).await.unwrap();
+    producer
+        .create_topic(
+            &Identifier::named(STREAM_NAME).unwrap(),
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    let stream_id = Identifier::named(STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+    let partitioning = Partitioning::partition_id(0);
+
+    let test_duration = Duration::from_secs(duration_secs);
+    let start = Instant::now();
+    let mut messages_sent = 0u64;
+
+    println!(
+        "Starting single message per batch test: sequential payloads, 
duration: {}s",
+        duration_secs
+    );
+
+    while start.elapsed() < test_duration {
+        let payload = Bytes::from(messages_sent.to_le_bytes().to_vec());
+        let mut messages = vec![
+            IggyMessage::builder()
+                .payload(payload)
+                .build()
+                .expect("Failed to create message"),
+        ];
+
+        producer
+            .send_messages(&stream_id, &topic_id, &partitioning, &mut messages)
+            .await
+            .unwrap();
+
+        messages_sent += 1;
+
+        if messages_sent.is_multiple_of(1000) {
+            println!(
+                "Progress: {} messages sent, elapsed: {:.2}s",
+                messages_sent,
+                start.elapsed().as_secs_f64()
+            );
+        }
+    }
+
+    let send_elapsed = start.elapsed();
+    println!(
+        "Sending completed: {} messages in {:.2}s ({:.0} msgs/sec)",
+        messages_sent,
+        send_elapsed.as_secs_f64(),
+        messages_sent as f64 / send_elapsed.as_secs_f64()
+    );
+
+    println!("Receiving and verifying all {} messages...", messages_sent);
+    let consumer = Consumer::default();
+    let mut next_offset = 0u64;
+    let mut messages_received = 0u64;
+
+    while messages_received < messages_sent {
+        let remaining = messages_sent - messages_received;
+        let batch_size = remaining.min(POLL_BATCH_SIZE as u64) as u32;
+
+        let polled = producer
+            .poll_messages(
+                &stream_id,
+                &topic_id,
+                Some(0),
+                &consumer,
+                &PollingStrategy::offset(next_offset),
+                batch_size,
+                false,
+            )
+            .await
+            .unwrap();
+
+        assert!(
+            !polled.messages.is_empty(),
+            "Expected messages at offset {}, but got none (sent: {}, received 
so far: {})",
+            next_offset,
+            messages_sent,
+            messages_received
+        );
+
+        for msg in &polled.messages {
+            let payload = &msg.payload;
+            assert_eq!(
+                payload.len(),
+                8,
+                "Message {} has unexpected payload length: {}",
+                messages_received,
+                payload.len()
+            );
+
+            let expected_value = messages_received;
+            let actual_value = 
u64::from_le_bytes(payload[..8].try_into().unwrap());
+            assert_eq!(
+                actual_value, expected_value,
+                "Message at offset {} has wrong payload: expected {}, got {}",
+                next_offset, expected_value, actual_value
+            );
+
+            messages_received += 1;
+            next_offset += 1;
+        }
+
+        if messages_received.is_multiple_of(10000) {
+            println!(
+                "Verified: {}/{} messages ({:.1}%)",
+                messages_received,
+                messages_sent,
+                (messages_received as f64 / messages_sent as f64) * 100.0
+            );
+        }
+    }
+
+    let total_elapsed = start.elapsed();
+    println!(
+        "Test completed: {} messages sent and verified in {:.2}s",
+        messages_sent,
+        total_elapsed.as_secs_f64()
+    );
+
+    producer.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 205d6b5df..7df3d75d9 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -17,7 +17,8 @@
  */
 
 use crate::server::scenarios::{
-    delete_segments_scenario, message_size_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
+    delete_segments_scenario, message_size_scenario, 
single_message_per_batch_scenario,
+    tcp_tls_scenario, websocket_tls_scenario,
 };
 use iggy::prelude::*;
 use integration::{
@@ -190,3 +191,24 @@ async fn message_size_scenario() {
 
     message_size_scenario::run(&client_factory).await;
 }
+
+#[tokio::test]
+#[parallel]
+async fn should_handle_single_message_per_batch_with_delayed_persistence() {
+    let mut extra_envs = HashMap::new();
+    extra_envs.insert(
+        "IGGY_SYSTEM_PARTITION_MESSAGES_REQUIRED_TO_SAVE".to_string(),
+        "10000".to_string(),
+    );
+
+    let mut test_server = TestServer::new(Some(extra_envs), true, None, 
IpAddrKind::V4);
+    test_server.start();
+
+    let server_addr = test_server.get_raw_tcp_addr().unwrap();
+    let client_factory = TcpClientFactory {
+        server_addr,
+        ..Default::default()
+    };
+
+    single_message_per_batch_scenario::run(&client_factory, 5).await;
+}
diff --git a/core/server/src/streaming/segments/messages/mod.rs 
b/core/server/src/streaming/segments/messages/mod.rs
index c085ed882..8f5ba9c81 100644
--- a/core/server/src/streaming/segments/messages/mod.rs
+++ b/core/server/src/streaming/segments/messages/mod.rs
@@ -22,27 +22,32 @@ mod messages_writer;
 use super::IggyMessagesBatchSet;
 use bytes::Bytes;
 use compio::{fs::File, io::AsyncWriteAtExt};
-use iggy_common::{IggyError, IggyMessagesBatch};
+use iggy_common::{IggyError, IggyMessagesBatch, PooledBuffer};
 
 pub use messages_reader::MessagesReader;
 pub use messages_writer::MessagesWriter;
 
+/// Maximum number of IO vectors for a single writev() call.
+/// Linux typically has IOV_MAX=1024, but we use a conservative value to ensure
+/// cross-platform compatibility and leave room for any internal overhead.
+const MAX_IOV_COUNT: usize = 1024;
+
 /// Vectored write a batches of messages to file
 async fn write_batch(
     file: &File,
     position: u64,
     mut batches: IggyMessagesBatchSet,
 ) -> Result<usize, IggyError> {
-    let total_written = batches.iter().map(|b| b.size() as usize).sum();
-    let batches = batches
-        .iter_mut()
-        .map(|b| b.take_messages())
-        .collect::<Vec<_>>();
-    let (result, _) = (&*file)
-        .write_vectored_all_at(batches, position)
-        .await
-        .into();
-    result.map_err(|_| IggyError::CannotWriteToFile)?;
+    let (total_written, buffers) =
+        batches
+            .iter_mut()
+            .fold((0usize, Vec::new()), |(size, mut bufs), batch| {
+                let batch_size = batch.size() as usize;
+                bufs.push(batch.take_messages());
+                (size + batch_size, bufs)
+            });
+
+    write_vectored_chunked_pooled(file, position, buffers).await?;
     Ok(total_written)
 }
 
@@ -56,12 +61,55 @@ pub async fn write_batch_frozen(
     position: u64,
     batches: &[IggyMessagesBatch],
 ) -> Result<usize, IggyError> {
-    let total_written: usize = batches.iter().map(|b| b.size() as usize).sum();
-    let buffers: Vec<Bytes> = batches.iter().map(|b| 
b.messages_bytes()).collect();
-    let (result, _) = (&*file)
-        .write_vectored_all_at(buffers, position)
-        .await
-        .into();
-    result.map_err(|_| IggyError::CannotWriteToFile)?;
+    let (total_written, buffers) = batches.iter().fold(
+        (0usize, Vec::with_capacity(batches.len())),
+        |(size, mut bufs), batch| {
+            bufs.push(batch.messages_bytes());
+            (size + batch.size() as usize, bufs)
+        },
+    );
+
+    write_vectored_chunked_bytes(file, position, buffers).await?;
     Ok(total_written)
 }
+
+/// Writes PooledBuffer buffers to file using vectored I/O, chunking to 
respect IOV_MAX limits.
+async fn write_vectored_chunked_pooled(
+    file: &File,
+    mut position: u64,
+    buffers: Vec<PooledBuffer>,
+) -> Result<(), IggyError> {
+    let mut iter = buffers.into_iter().peekable();
+
+    while iter.peek().is_some() {
+        let chunk: Vec<PooledBuffer> = 
iter.by_ref().take(MAX_IOV_COUNT).collect();
+        let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
+
+        let (result, _) = (&*file).write_vectored_all_at(chunk, 
position).await.into();
+        result.map_err(|_| IggyError::CannotWriteToFile)?;
+
+        position += chunk_size as u64;
+    }
+    Ok(())
+}
+
+/// Writes Bytes buffers to file using vectored I/O, chunking to respect 
IOV_MAX limits.
+async fn write_vectored_chunked_bytes(
+    file: &File,
+    mut position: u64,
+    buffers: Vec<Bytes>,
+) -> Result<(), IggyError> {
+    for chunk in buffers.chunks(MAX_IOV_COUNT) {
+        let chunk_size: usize = chunk.iter().map(|b| b.len()).sum();
+        let chunk_vec: Vec<Bytes> = chunk.to_vec();
+
+        let (result, _) = (&*file)
+            .write_vectored_all_at(chunk_vec, position)
+            .await
+            .into();
+        result.map_err(|_| IggyError::CannotWriteToFile)?;
+
+        position += chunk_size as u64;
+    }
+    Ok(())
+}

Reply via email to